Listening to generic JSON notifications from PostgreSQL in Go

September 15, 2015

It’s already widely known that PostgreSQL is the leading open source relational database when it comes to features. One of those features is great JSON support, an other is LISTEN/NOTIFY, a nifty pub-sub sytem exclusive to PostgreSQL. When combining these two, you get a good basis for a real-time push notification system. In this post, I will explain how to create a generic trigger function to generate JSON notifications for any table change, and how to listen to them in Go.

The trigger function

First, we will create the trigger function. The function will notify a channel events with the table name, the action and the old/new row data, depending on the action. As there are no table specific references, you can use the same trigger on all tables you want notifications from.

CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$

    DECLARE 
        data json;
        notification json;
    
    BEGIN
    
        -- Convert the old or new row to JSON, based on the kind of action.
        -- Action = DELETE?             -> OLD row
        -- Action = INSERT or UPDATE?   -> NEW row
        IF (TG_OP = 'DELETE') THEN
            data = row_to_json(OLD);
        ELSE
            data = row_to_json(NEW);
        END IF;
        
        -- Contruct the notification as a JSON string.
        notification = json_build_object(
                          'table',TG_TABLE_NAME,
                          'action', TG_OP,
                          'data', data);
        
                        
        -- Execute pg_notify(channel, notification)
        PERFORM pg_notify('events',notification::text);
        
        -- Result is ignored since this is an AFTER trigger
        RETURN NULL; 
    END;
    
$$ LANGUAGE plpgsql;

Now that the trigger function is created, let’s create a sample table…

CREATE TABLE products (
  id SERIAL,
  name TEXT,
  quantity FLOAT
);

… and add a trigger to it. As you can see, we add the trigger for all three actions in only one statement:

CREATE TRIGGER products_notify_event
AFTER INSERT OR UPDATE OR DELETE ON products
    FOR EACH ROW EXECUTE PROCEDURE notify_event();

Result

Fire up psql or any other PostgreSQL client and execute the following:

exampledb=# LISTEN events;
LISTEN
exampledb=# INSERT INTO products (name, quantity)
exampledb-# VALUES ('pen', 10200);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "products", 
  "action" : "INSERT", "data" : {"id":1,"name":"pen","quantity":10200}}" 
  received from server process with PID 799.
exampledb=#

Et voila, you now get notifications for any action on the table.

Listening for the notifications in Go.

Receiving notifications in a terminal doesn’t have a lot of real-life value, so the next step would be to capture these events in a server application. In Go, this is fairly easy using the pq package, which already includes functionality for listening to PostgreSQL notifications.

The app below is based on the sample app in the pqdocs.

package main

import (
	"bytes"
	"database/sql"
	"encoding/json"
	"fmt"
	"time"
	"github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
	for {
		select {
		case n := <-l.Notify:
			fmt.Println("Received data from channel [", n.Channel, "] :")
			// Prepare notification payload for pretty print
			var prettyJSON bytes.Buffer
			err := json.Indent(&prettyJSON, []byte(n.Extra), "", "\t")
			if err != nil {
				fmt.Println("Error processing JSON: ", err)
				return
			}
			fmt.Println(string(prettyJSON.Bytes()))
			return
		case <-time.After(90 * time.Second):
			fmt.Println("Received no events for 90 seconds, checking connection")
			go func() {
				l.Ping()
			}()
			return
		}
	}
}

func main() {
	var conninfo string = "dbname=exampledb user=webapp password=webapp"

	_, err := sql.Open("postgres", conninfo)
	if err != nil {
		panic(err)
	}

	reportProblem := func(ev pq.ListenerEventType, err error) {
		if err != nil {
			fmt.Println(err.Error())
		}
	}

	listener := pq.NewListener(conninfo, 10*time.Second, time.Minute, reportProblem)
	err = listener.Listen("events")
	if err != nil {
		panic(err)
	}

	fmt.Println("Start monitoring PostgreSQL...")
	for {
		waitForNotification(listener)
	}
}

Now, run the app and make a change on the products table. You will see the notifications are being handled by the app:

Received data from channel [ events ] :
{
        "table": "products",
        "action": "INSERT",
        "data": {
                "id": 1,
                "name": "pen",
                "quantity": 10200
        }
}  

What’s next?

Of course, this example app isn’t going to be of much use in real life. A better example would be to create a Websocket http handler that notifies any subscribed clients of the changes, so you can forward the database updates in real time to the clients. I’ll take this up in a next post.

comments powered by Disqus