May 17, 2022

Let's build a distributed Postgres proof of concept

What is CockroachDB under the hood? Take a look at its go.mod and notice a number of dependencies that do a lot of work: a PostgreSQL wire protocol implementation, a storage layer, a Raft implementation for distributed consensus. And not part of go.mod but still building on 3rd party code, PostgreSQL's grammar definition.

To be absurdly reductionist, CockroachDB is just the glue around these libraries. With that reductionist mindset, let's try building a distributed Postgres proof of concept ourselves! We'll use only four major external libraries: for parsing SQL, handling Postgres's wire protocol, handling Raft, and handling the storage of table metadata and rows themselves.

For a not-reductionist understanding of the CockroachDB internals, I recommend following the excellent Cockroach Engineering blog and Jordan Lewis's Hacking CockroachDB Twitch stream.

By the end of this post, in around 600 lines of code, we'll have a distributed "Postgres implementation" that will accept writes (CREATE TABLE, INSERT) on the leader and accept reads (SELECT) on any node. All nodes will contain the same data.

Here is a sample interaction against the leader:

$ psql -h localhost -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.

phil=> create table x (age int, name text);
CREATE ok
phil=> insert into x values(14, 'garry'), (20, 'ted');
could not interpret result from server: INSERT ok
INSERT ok
phil=> select name, age from x;
  name   | age 
---------+-----
 "garry" |  14
 "ted"   |  20
(2 rows)

And against a follower (note the different port):

$ psql -h 127.0.0.1 -p 6001
psql (13.4, server 0.0.0)
Type "help" for help.

phil=> select age, name from x;
 age |  name
-----+---------
  20 | "ted"
  14 | "garry"
(2 rows)

All code for this post is available on Github in the fondly named WaterbugDB repo.

Plan of attack

Influenced by Philip O'Toole's talk on rqlite at Hacker Nights we'll have a Postgres wire protocol server in front. As it receives queries it will respond immediately to SELECTs. Otherwise for CREATE TABLEs and INSERTs it will send the entire query string to the Raft cluster. Each process that is part of the Raft cluster will implement the appropriate functions for handling Raft messages. In this case the messages will just be to create a table or insert data.

So every running process will run a Postgres wire protocol server, a Raft server, and an HTTP server that you'll see is an implementation detail about how processes join to the same Raft cluster.

Every running process will have its own directory for storing data.

Raft

There is likely a difference between Raft, the paper, and Raft, the implementations. When I refer to Raft in the rest of this post I'm going to be referring to an implementation.

And although CockroachDB use's etcd's Raft implementation, I didn't realize that when I started building this project. I used Hashicorp's Raft implementation.

Raft allows us to reliably keep multiple nodes in sync with a log of messages. Each node in the Raft cluster implements a finite state machine (FSM) with three operations: apply, snapshot, and restore. Our finite state machine will embed a postgres engine we'll build out after this to handle query execution.

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net"
    "net/http"
    "os"
    "path"
    "strings"
    "time"

    "github.com/google/uuid"
    "github.com/hashicorp/raft"
    "github.com/hashicorp/raft-boltdb"
    "github.com/jackc/pgproto3/v2"
    pgquery "github.com/pganalyze/pg_query_go/v2"
    bolt "go.etcd.io/bbolt"
)

type pgFsm struct {
    pe *pgEngine
}

From what I understand, the snapshot operation allows Raft to truncate logs. It is used in conjuction with restoring. On startup if there is a snapshot, restore is called so you can load the snapshot. Then afterwards all logs not yet snapshotted are replayed through the apply operation.

To keep this implementation simple we'll just fail all snapshots so restore will never be called and all logs will be replayed every time on startup through the apply operation. This is of course inefficient but it keeps the code simpler.

When we write the startup code we'll need to delete the database so that these apply calls happen fresh.

type snapshotNoop struct{}

func (sn snapshotNoop) Persist(sink raft.SnapshotSink) error {
    return sink.Cancel()
}

func (sn snapshotNoop) Release() {}

func (pf *pgFsm) Snapshot() (raft.FSMSnapshot, error) {
    return snapshotNoop{}, nil
}

func (pf *pgFsm) Restore(rc io.ReadCloser) error {
    return fmt.Errorf("Nothing to restore")
}

Finally, applying is receiving a single message and applying it for the node. In this project the message will be a CREATE TABLE or INSERT query. So we'll parse the query and pass it to the postgres engine for execution.

func (pf *pgFsm) Apply(log *raft.Log) interface{} {
    switch log.Type {
    case raft.LogCommand:
        ast, err := pgquery.Parse(string(log.Data))
        if err != nil {
            panic(fmt.Errorf("Could not parse payload: %s", err))
        }

        err = pf.pe.execute(ast)
        if err != nil {
            panic(err)
        }
    default:
        panic(fmt.Errorf("Unknown raft log type: %#v", log.Type))
    }

    return nil
}

Panic-ing here is actually the advised behavior.

Raft server

Now we can set up the actual Raft server and pass an instance of this FSM. This is a bunch of boilerplate that would matter in production installs but for us basically we just need to tell Raft where to run and how to store its own internal data, including its all-important message log.

func setupRaft(dir, nodeId, raftAddress string, pf *pgFsm) (*raft.Raft, error) {
    os.MkdirAll(dir, os.ModePerm)

    store, err := raftboltdb.NewBoltStore(path.Join(dir, "bolt"))
    if err != nil {
        return nil, fmt.Errorf("Could not create bolt store: %s", err)
    }

    snapshots, err := raft.NewFileSnapshotStore(path.Join(dir, "snapshot"), 2, os.Stderr)
    if err != nil {
        return nil, fmt.Errorf("Could not create snapshot store: %s", err)
    }

    tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddress)
    if err != nil {
        return nil, fmt.Errorf("Could not resolve address: %s", err)
    }

    transport, err := raft.NewTCPTransport(raftAddress, tcpAddr, 10, time.Second*10, os.Stderr)
    if err != nil {
        return nil, fmt.Errorf("Could not create tcp transport: %s", err)
    }

    raftCfg := raft.DefaultConfig()
    raftCfg.LocalID = raft.ServerID(nodeId)

    r, err := raft.NewRaft(raftCfg, pf, store, store, snapshots, transport)
    if err != nil {
        return nil, fmt.Errorf("Could not create raft instance: %s", err)
    }

    // Cluster consists of unjoined leaders. Picking a leader and
    // creating a real cluster is done manually after startup.
    r.BootstrapCluster(raft.Configuration{
        Servers: []raft.Server{
            {
                ID:      raft.ServerID(nodeId),
                Address: transport.LocalAddr(),
            },
        },
    })

    return r, nil
}

Every instance of this process will run this and will start off as a leader in a new cluster. We'll expose an HTTP server that allows a leader to talk to other leaders to tell them to stop leading and follow it. This HTTP endpoint in the HTTP server is how we'll get from N process with N leaders and N clusters to N processes with 1 leader and 1 cluster.

That's basically it for the core Raft bits. So let's build out that HTTP server and follow endpoint.

HTTP follow endpoint

Our HTTP server will have just one endpoint that tells the process (a) to contact another process (b) so that process (b) joins the process (a) cluster.

The HTTP server will need to have the process (a)'s Raft instance to be able to start this join action. And in order for Raft to know how to contact the process (b) we'll need to tell it both the process (b)'s unique Raft node id (we'll give it a unique id ourselves when we start the process) and the process (b)'s Raft server port.

type httpServer struct {
    r *raft.Raft
}

func (hs httpServer) addFollowerHandler(w http.ResponseWriter, r *http.Request) {
    followerId := r.URL.Query().Get("id")
    followerAddr := r.URL.Query().Get("addr")

    if hs.r.State() != raft.Leader {
        json.NewEncoder(w).Encode(struct {
            Error string `json:"error"`
        }{
            "Not the leader",
        })
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
        return
    }

    err := hs.r.AddVoter(raft.ServerID(followerId), raft.ServerAddress(followerAddr), 0, 0).Error()
    if err != nil {
        log.Printf("Failed to add follower: %s", err)
        http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
        return
    }

    w.WriteHeader(http.StatusOK)
}

That's it! Let's move on to the query engine.

Query engine

The query engine is a wrapper around a storage layer. We'll bring in bbolt.

I originally built this with Cockroach's pebble but pebble has a transitive dependency on a C library that has function names that conflict with function names in the C library that pg_query_go wraps.

type pgEngine struct {
    db         *bolt.DB
    bucketName []byte
}

func newPgEngine(db *bolt.DB) *pgEngine {
    return &pgEngine{db, []byte("data")}
}

bbolt organizes data into buckets. Buckets might be a natural way to store table rows (one bucket per table) but to keep the implementation simple we'll put all table metadata and row data into a single `data` bucket.

The entrypoint we called in the Raft apply implementation above was execute. It took a parsed list of statements. We'll iterate over the statements, figuring out the kind of each statement, and call out to a dedicated helper for each kind.

func (pe *pgEngine) execute(tree *pgquery.ParseResult) error {
    for _, stmt := range tree.GetStmts() {
        n := stmt.GetStmt()
        if c := n.GetCreateStmt(); c != nil {
            return pe.executeCreate(c)
        }

        if c := n.GetInsertStmt(); c != nil {
            return pe.executeInsert(c)
        }

        if c := n.GetSelectStmt(); c != nil {
            _, err := pe.executeSelect(c)
            return err
        }

        return fmt.Errorf("Unknown statement type: %s", stmt)
    }

    return nil
}

The pg_query_go docs are not super helpful. I had to build a separate AST explorer program to make it easier to understand this parser.

Let's start with creating a table.

Create table

When a table is created, we'll need to store its metadata.

type tableDefinition struct {
    Name        string
    ColumnNames []string
    ColumnTypes []string
}

First we pull that metadata out of the AST.

func (pe *pgEngine) executeCreate(stmt *pgquery.CreateStmt) error {
    tbl := tableDefinition{}
    tbl.Name = stmt.Relation.Relname

    for _, c := range stmt.TableElts {
        cd := c.GetColumnDef()

        tbl.ColumnNames = append(tbl.ColumnNames, cd.Colname)

        // Names is namespaced. So `INT` is pg_catalog.int4. `BIGINT` is pg_catalog.int8.
        var columnType string
        for _, n := range cd.TypeName.Names {
            if columnType != "" {
                columnType += "."
            }
            columnType += n.GetString_().Str
        }
        tbl.ColumnTypes = append(tbl.ColumnTypes, columnType)
    }

Now we need to store this in the storage layer. The easiest/dumbest way to do this is to serialize the metadata to JSON and store it with key: tables_${tableName}.

    tableBytes, err := json.Marshal(tbl)
    if err != nil {
        return fmt.Errorf("Could not marshal table: %s", err)
    }

    err = pe.db.Update(func(tx *bolt.Tx) error {
        bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
        if err != nil {
            return err
        }

        return bkt.Put([]byte("tables_"+tbl.Name), tableBytes)
    })

    if err != nil {
        return fmt.Errorf("Could not set key-value: %s", err)
    }

    return nil
}

Next we'll build a helper to reverse that operation, pulling out table metadata from the storage layer by the table name:

func (pe *pgEngine) getTableDefinition(name string) (*tableDefinition, error) {
    var tbl tableDefinition

    err := pe.db.View(func(tx *bolt.Tx) error {
        bkt := tx.Bucket(pe.bucketName)
        if bkt == nil {
            return fmt.Errorf("Table does not exist")
        }

        valBytes := bkt.Get([]byte("tables_" + name))
        err := json.Unmarshal(valBytes, &tbl)
        if err != nil {
            return fmt.Errorf("Could not unmarshal table: %s", err)
        }

        return nil
    })

    return &tbl, err
}

That's it for our basic CREATE TABLE support! Let's do INSERT next.

Insert row

Our support for insert will only support literal/constant VALUES.

func (pe *pgEngine) executeInsert(stmt *pgquery.InsertStmt) error {
    tblName := stmt.Relation.Relname

    slct := stmt.GetSelectStmt().GetSelectStmt()
    for _, values := range slct.ValuesLists {
        var rowData []any
        for _, value := range values.GetList().Items {
            if c := value.GetAConst(); c != nil {
                if s := c.Val.GetString_(); s != nil {
                    rowData = append(rowData, s.Str)
                    continue
                }

                if i := c.Val.GetInteger(); i != nil {
                    rowData = append(rowData, i.Ival)
                    continue
                }
            }

            return fmt.Errorf("Unknown value type: %s", value)
        }

It would be better to abstract this VALUES code into a helper so it could be used by SELECTs too but out of laziness we'll just keep this here.

Next we need to write the row to the storage layer. We'll serialize the row data to JSON (inefficient because we know the row structure, but JSON is easy). We'll store the row with a prefix including the table name and we'll give its key a unique UUID. When we're iterating over rows in the table we'll be able to do a prefix scan that will recover just the rows in this table.

        rowBytes, err := json.Marshal(rowData)
        if err != nil {
            return fmt.Errorf("Could not marshal row: %s", err)
        }

        id := uuid.New().String()
        err = pe.db.Update(func(tx *bolt.Tx) error {
            bkt, err := tx.CreateBucketIfNotExists(pe.bucketName)
            if err != nil {
                return err
            }

            return bkt.Put([]byte("rows_"+tblName+"_"+id), rowBytes)
        })
        if err != nil {
            return fmt.Errorf("Could not store row: %s", err)
        }
    }

    return nil
}

Finally we can move on to support SELECT!

Select rows

Unlike CREATE TABLE and INSERT, SELECT will need to return rows, column names, and because the Postgres wire protocol wants it, column types.

type pgResult struct {
    fieldNames []string
    fieldTypes []string
    rows       [][]any
}

First we pull out the table name and the fields selected, looking up field types in the table metadata.

func (pe *pgEngine) executeSelect(stmt *pgquery.SelectStmt) (*pgResult, error) {
    tblName := stmt.FromClause[0].GetRangeVar().Relname
    tbl, err := pe.getTableDefinition(tblName)
    if err != nil {
        return nil, err
    }

    results := &pgResult{}
    for _, c := range stmt.TargetList {
        fieldName := c.GetResTarget().Val.GetColumnRef().Fields[0].GetString_().Str
        results.fieldNames = append(results.fieldNames, fieldName)

        fieldType := ""
        for i, cn := range tbl.ColumnNames {
            if cn == fieldName {
                fieldType = tbl.ColumnTypes[i]
            }
        }

        if fieldType == "" {
            return nil, fmt.Errorf("Unknown field: %s", fieldName)
        }

        results.fieldTypes = append(results.fieldTypes, fieldType)
    }

Finally, we do a prefix scan to grab all rows in the table from the storage layer.

    prefix := []byte("rows_" + tblName + "_")
    pe.db.View(func(tx *bolt.Tx) error {
        c := tx.Bucket(pe.bucketName).Cursor()

        for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
            var row []any
            err = json.Unmarshal(v, &row)
            if err != nil {
                return fmt.Errorf("Unable to unmarshal row: %s", err)
            }

            var targetRow []any
            for _, target := range results.fieldNames {
                for i, field := range tbl.ColumnNames {
                    if target == field {
                        targetRow = append(targetRow, row[i])
                    }
                }
            }

            results.rows = append(results.rows, targetRow)
        }

        return nil
    })

    return results, nil
}

That's it for SELECT! The last function we'll implement is a helper for deleting all data in the storage layer. This will be called on startup before Raft logs are applied so the database always ends up in a consistent state.

func (pe *pgEngine) delete() error {
    return pe.db.Update(func(tx *bolt.Tx) error {
        bkt := tx.Bucket(pe.bucketName)
        if bkt != nil {
            return tx.DeleteBucket(pe.bucketName)
        }

        return nil
    })
}

And we're ready to move on to the final layer, the Postgres wire protocol.

Postgres wire protocol server

jackc/pgproto3 is an implementation of the Postgres wire protocol for Go. It allows us to implement a server that can respond to requests by Postgres clients like psql.

It works by wrapping a TCP connection. So we'll start by building a function that does the TCP serving loop.

func runPgServer(port string, db *bolt.DB, r *raft.Raft) {
    ln, err := net.Listen("tcp", "localhost:"+port)
    if err != nil {
        log.Fatal(err)
    }

    for {
        conn, err := ln.Accept()
        if err != nil {
            log.Fatal(err)
        }

        pc := pgConn{conn, db, r}
        go pc.handle()
    }
}

The pgConn instance needs access to the database directly so it can respond to SELECTs. And it needs the Raft instance for all other queries.

type pgConn struct {
    conn net.Conn
    db   *bolt.DB
    r    *raft.Raft
}

The handle function we called above will grab the current message via the pgproto3 package and handle startup messages and regular messages.

func (pc pgConn) handle() {
    pgc := pgproto3.NewBackend(pgproto3.NewChunkReader(pc.conn), pc.conn)
    defer pc.conn.Close()

    err := pc.handleStartupMessage(pgc)
    if err != nil {
        log.Println(err)
        return
    }

    for {
        err := pc.handleMessage(pgc)
        if err != nil {
            log.Println(err)
            return
        }
    }
}

Startup messages include authorization and SSL checks. We'll allow anything in the former and respond "no" to the latter.

func (pc pgConn) handleStartupMessage(pgconn *pgproto3.Backend) error {
    startupMessage, err := pgconn.ReceiveStartupMessage()
    if err != nil {
        return fmt.Errorf("Error receiving startup message: %s", err)
    }

    switch startupMessage.(type) {
    case *pgproto3.StartupMessage:
        buf := (&pgproto3.AuthenticationOk{}).Encode(nil)
        buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
        _, err = pc.conn.Write(buf)
        if err != nil {
            return fmt.Errorf("Error sending ready for query: %s", err)
        }

        return nil
    case *pgproto3.SSLRequest:
        _, err = pc.conn.Write([]byte("N"))
        if err != nil {
            return fmt.Errorf("Error sending deny SSL request: %s", err)
        }

        return pc.handleStartupMessage(pgconn)
    default:
        return fmt.Errorf("Unknown startup message: %#v", startupMessage)
    }
}

Within the main handleMessage logic we'll check the type of message.

func (pc pgConn) handleMessage(pgc *pgproto3.Backend) error {
    msg, err := pgc.Receive()
    if err != nil {
        return fmt.Errorf("Error receiving message: %s", err)
    }

    switch t := msg.(type) {
    case *pgproto3.Query:
                // TODO
    case *pgproto3.Terminate:
        return nil
    default:
        return fmt.Errorf("Received message other than Query from client: %s", msg)
    }

    return nil
}

If the message is a query we'll parse it and respond immediately to SELECTs.

    switch t := msg.(type) {
    case *pgproto3.Query:
        stmts, err := pgquery.Parse(t.String)
        if err != nil {
            return fmt.Errorf("Error parsing query: %s", err)
        }

        if len(stmts.GetStmts()) > 1 {
            return fmt.Errorf("Only make one request at a time.")
        }

        stmt := stmts.GetStmts()[0]

        // Handle SELECTs here
        s := stmt.GetStmt().GetSelectStmt()
        if s != nil {
            pe := newPgEngine(pc.db)
            res, err := pe.executeSelect(s)
            if err != nil {
                return err
            }

            pc.writePgResult(res)
            return nil
        }

(We'll implement that writePgResult helper shortly below.) Otherwise we'll add the query to the Raft log and return a basic response.

        // Otherwise it's DDL/DML, raftify
        future := pc.r.Apply([]byte(t.String), 500*time.Millisecond)
        if err := future.Error(); err != nil {
            return fmt.Errorf("Could not apply: %s", err)
        }

        e := future.Response()
        if e != nil {
            return fmt.Errorf("Could not apply (internal): %s", e)
        }

        pc.done(nil, strings.ToUpper(strings.Split(t.String, " ")[0])+" ok")
    case *pgproto3.Terminate:
        return nil
    default:
        return fmt.Errorf("Received message other than Query from client: %s", msg)
    }

    return nil
}

done is an important helper that tells the Postgres connection that the query is complete and the server is ready to receive another query. Without this response psql just hangs.

func (pc pgConn) done(buf []byte, msg string) {
    buf = (&pgproto3.CommandComplete{CommandTag: []byte(msg)}).Encode(buf)
    buf = (&pgproto3.ReadyForQuery{TxStatus: 'I'}).Encode(buf)
    _, err := pc.conn.Write(buf)
    if err != nil {
        log.Printf("Failed to write query response: %s", err)
    }
}

And now let's implement the writePgResult helper. This function needs to translate from our pgResult struct to the format require by pgproto3.

var dataTypeOIDMap = map[string]uint32{
    "text":            25,
    "pg_catalog.int4": 23,
}

func (pc pgConn) writePgResult(res *pgResult) {
    rd := &pgproto3.RowDescription{}
    for i, field := range res.fieldNames {
        rd.Fields = append(rd.Fields, pgproto3.FieldDescription{
            Name:        []byte(field),
            DataTypeOID: dataTypeOIDMap[res.fieldTypes[i]],
        })
    }
    buf := rd.Encode(nil)
    for _, row := range res.rows {
        dr := &pgproto3.DataRow{}
        for _, value := range row {
            bs, err := json.Marshal(value)
            if err != nil {
                log.Printf("Failed to marshal cell: %s\n", err)
                return
            }

            dr.Values = append(dr.Values, bs)
        }

        buf = dr.Encode(buf)
    }

    pc.done(buf, fmt.Sprintf("SELECT %d", len(res.rows)))
}

And we're done with everything but func main()!

Main

On startup, each process must be assigned (by the parent process) a unique node id (any unique string is ok) and ports for the Raft server, Postgres server, and HTTP server. We'll build a short getConfig helper to grab these from arguments.

type config struct {
    id       string
    httpPort string
    raftPort string
    pgPort   string
}

func getConfig() config {
    cfg := config{}
    for i, arg := range os.Args[1:] {
        if arg == "--node-id" {
            cfg.id = os.Args[i+2]
            i++
            continue
        }

        if arg == "--http-port" {
            cfg.httpPort = os.Args[i+2]
            i++
            continue
        }

        if arg == "--raft-port" {
            cfg.raftPort = os.Args[i+2]
            i++
            continue
        }

        if arg == "--pg-port" {
            cfg.pgPort = os.Args[i+2]
            i++
            continue
        }
    }

    if cfg.id == "" {
        log.Fatal("Missing required parameter: --node-id")
    }

    if cfg.raftPort == "" {
        log.Fatal("Missing required parameter: --raft-port")
    }

    if cfg.httpPort == "" {
        log.Fatal("Missing required parameter: --http-port")
    }

    if cfg.pgPort == "" {
        log.Fatal("Missing required parameter: --pg-port")
    }

    return cfg
}

Now in main we'll grab the config and set up this process's database. All processes will put their data in a top-level data directory to make managing the directories easier. But within that directory each process will have their own unique directories for data storage based on the unique node id.

func main() {
    cfg := getConfig()

    dataDir := "data"
    err := os.MkdirAll(dataDir, os.ModePerm)
    if err != nil {
        log.Fatalf("Could not create data directory: %s", err)
    }

    db, err := bolt.Open(path.Join(dataDir, "/data"+cfg.id), 0600, nil)
    if err != nil {
        log.Fatalf("Could not open bolt db: %s", err)
    }
    defer db.Close()

We need to clean up the database.

    pe := newPgEngine(db)
    // Start off in clean state
    pe.delete()

Set up the Raft server.

    pf := &pgFsm{pe}
    r, err := setupRaft(path.Join(dataDir, "raft"+cfg.id), cfg.id, "localhost:"+cfg.raftPort, pf)
    if err != nil {
        log.Fatal(err)
    }

Set up the HTTP server.

    hs := httpServer{r}
    http.HandleFunc("/add-follower", hs.addFollowerHandler)
    go func() {
        err := http.ListenAndServe(":"+cfg.httpPort, nil)
        if err != nil {
            log.Fatal(err)
        }
    }()

And finally, kick off the Postgres server.

    runPgServer(cfg.pgPort, db, r)
}

Finally. Finally. Finally done. Let's give it a go. :)

What hath god wrought

First, initialize the go module and then build the app.

$ go mod init waterbugdb
$ go mod tidy
$ go build

Now in terminal 1 start an instance of the database,

$ ./waterbugdb --node-id node1 --raft-port 2222 --http-port 8222 --pg-port 6000

Then in terminal 2 start another instance.

$ ./waterbugdb --node-id node2 --raft-port 2223 --http-port 8223 --pg-port 6001

And in terminal 3, tell node1 to have node2 follow it.

$ curl 'localhost:8222/add-follower?addr=localhost:2223&id=node2'

And then open psql against port 6000, the leader.

$ psql -h localhost -p 6000
psql -h 127.0.0.1 -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.

phil=> create table x (age int, name text);
CREATE ok
phil=> insert into x values(14, 'garry'), (20, 'ted');
could not interpret result from server: INSERT ok
INSERT ok
phil=> select name, age from x;
  name   | age 
---------+-----
 "garry" |  14
 "ted"   |  20
(2 rows)

Now kill node1 in terminal 1. Then start it up again. node2 will now be the leader. So exit psql in terminal 3 and enter it again pointed at node2, port 6001. Add new data.

$ psql -h 127.0.0.1 -p 6001
psql (13.4, server 0.0.0)
Type "help" for help.

phil=> insert into x values(19, 'ava'), (18, 'ming');
could not interpret result from server: INSERT ok
phil=> select age, name from x;
 age |  name
-----+---------
  20 | "ted"
  14 | "garry"
  18 | "ming"
  19 | "ava"

Exit psql in terminal 3 and start it up again against node1 again, port 6000.

$ psql -h 127.0.0.1 -p 6000
psql (13.4, server 0.0.0)
Type "help" for help.

phil=> select age, name from x;
 age |  name
-----+---------
  20 | "ted"
  14 | "garry"
  18 | "ming"
  19 | "ava"
(2 rows)

Nifty stuff.

Summary

So on the one hand this was a more complex post than my usual. Each process needed three servers running. Two of those servers we managed directly and the Raft server was managed by the Raft library.

On the other hand, we did this all in a really small amount of code. Yes many edge cases were unhandled and massive amount of SQL was unhandled. And yes there are tons of inefficiencies like using JSON, an unstructured format when every table has fixed structure. But hopefully now you have an idea of how a project like this could be structured. And there's the beginnings of a framework for filling in syntax/edge cases over time.

Additionally, the only problem we solved with consensus was replication, not sharding. This, and it's more complicated cousin (cross-shard transactions), is truly the special sauce Cockroach brings.

Read more about building an intuition for sharding, replication, and distributed consensus [here](https://notes.eatonphil.com/2024-02-08-an-intuition-for-distributed-consensus-in-oltp-systems.html.