Non-blocking Channels in Go

Posted by Nathan Osman on September 13, 2015

One of the latest projects I’ve been working on is a simple SMTP client written in Go. I’ve really enjoyed developing applications in Go since it provides concurrency primitives out-of-the-box. In this article, I’m going to describe a technique for sending any number of values on a channel without having the send block.

Before doing that, I’ll take a brief moment to explain the problem that this technique attempts to solve. Let’s pretend we have a string channel and we want to send a single value. We would write a line that resembles the following:

strChan <- "Hello, world!"

Without any further information about the context of this line or how the channel was created, it is impossible to know whether the send will block or not. If the channel is unbuffered (the default), then it will block until another goroutine receives the value. Let’s suppose the following line was used to create the channel:

strChan := make(chan string, 1)

We now know the channel has a capacity of 1, but that still doesn’t guarantee sending a value on the channel won’t block. If a value is already in the channel’s buffer, the send will indeed block until another goroutine receives a value from the channel. The problem can be delayed by increasing the buffer size but as soon as the channel’s capacity is exceeded, the send will block.

One of the ways you can work around this problem is by using a select{} block:

select {
case strChan <- "value":
case otherChan <- "other value":
}

The example above will block until one of the operations in the case block succeeds rather than waiting exclusively for one to succeed. It is also possible to combine send and receive operations in a select{} block. This will come in handy later when I describe my technique. This could be used to implement a timeout for sending a value on a channel:

select {
case strChan <- "value":
case <-time.After(5 * time.Second):
}

The example above will abort the send operation if the send blocks for more than 5 seconds.

Sometimes, however, this isn’t enough. You may need to unconditionally send a list of values on a channel and be completely certain that nothing will block. For that specific scenario, I have developed NonBlockingChan. I will quickly run through the code and explain how it all works. First, the struct itself:

type NonBlockingChan struct {
    Send chan<- interface{}
    Recv <-chan interface{}
}

The goal is to let users of this type send values on the Send channel and never worry about the send blocking. Note that the values sent and received on the channel are of type interface{}. This means that although you can send a value of any type on the Send channel, you will need to cast it to the appropriate type when you receive the value from the Recv channel.

All of the code that makes this work resides in the NewNonBlockingChan() function, so we’ll take a look at that next.

func NewNonBlockingChan() *NonBlockingChan {
    var (
        send = make(chan interface{})
        recv = make(chan interface{})
        n    = &NonBlockingChan{
            Send: send,
            Recv: recv,
        }
        values = list.New()
        sendClosed = false
    )

The channels are created first and then assigned to the instance that will eventually be returned to the caller. This lets the function use the channels in either direction while preventing the caller from using them in the wrong direction. A list is also created for buffering the values.

Next, a goroutine is started to facilitate the sending, buffering, and receiving of values. The select{} block in the goroutine attempts to do one or both of the following operations:

  • send a value from the list if it contains one or more values
  • receive a new value if the Send channel was not closed

Since both of these operations depend on specific conditions being met, a select{} block cannot be used. Instead, the Select() function from the reflect package must be used. The function expects an array of reflect.SelectCase. To make things easier, constants can be used for the array indices:

go func() {
    const (
        incomingCase = iota
        outgoingCase
        numCases
    )

The main part of the goroutine consists of a loop that runs until there are no values remaining in the list and the Send channel is closed:

for !sendClosed || items.Len() > 0 {

Next, the reflect.SelectCase array is created and populated:

cases := make([]reflect.SelectCase, numCases)
cases[incomingCase].Dir = reflect.SelectRecv
cases[outgoingCase].Dir = reflect.SelectSend

The incomingCase is only populated if the Send channel hasn’t already been closed:

if !sendClosed {
    cases[incomingCase].Chan = reflect.ValueOf(send)
}

The outgoingCase is only populated if there is at least one value in the list (in which case the first value is sent):

if items.Len() > 0 {
    cases[outgoingCase].Chan = reflect.ValueOf(recv)
    cases[outgoingCase].Send = reflect.ValueOf(values.Front().Value)
}

The array of reflect.SelectCase can now be passed to reflect.Select():

i, val, ok := reflect.Select(cases)

Three values are returned:

  • the array index of the select operation that succeeded
  • the value received from the channel (if applicable)
  • a bool indicating whether a value was received

A switch{} block can be used to process the result of Select() since constants were defined for the array indices:

switch i {

For the incomingCase, either one of two things happened:

  • a value was received (ok will be true)
  • the Send channel was closed (ok will be false)

If a value was received, it is immediately added to the list. Otherwise, sendClosed is set to true.

case incomingCase:
    if ok {
        values.PushBack(val.Interface())
    } else {
        sendClosed = true
    }

For the outgoingCase, the value that was sent on the Recv channel needs to be removed from the list:

case outgoingCase:
    values.Remove(values.Front())

That concludes the body of the loop. The only thing that remains to be done in the goroutine is to close the Recv channel to indicate that all values have been received:

    }
    close(recv)
}

The NewNonBlockingChan() function concludes with the new instance being returned to the caller:

    return n
}

And that’s it! Using this new type is fairly straightforward:

n := NewNonBlockingChan()
n.Send <- true  // it doesn't block!

No matter how many values you send on the channel, the send operations will never block. The “channel” will continue to buffer values in its list until they are received. Closing the Send channel will cause the Recv channel to also be closed after the last value is received.