Sunday, 24 February 2019

Type Safe Channels in Go

I've recently been working on a backend system in Go. The design is highly parallelised with many small go-routines each handling a set of messages delivered to a channel, thence processing and possibly sending on results to the next channel(s) in the process (or to frontend via websocket). One of the first things I realized is that, due to the complexity of the system, it is easy to mistakenly write code that sends a specific message type to the wrong channel.

To this end I developed a method of making the channels type-safe which I have not seen discussed before.  This saved a lot of time building and debugging compared to other solutions (discussed below).

There were a few hurdles with this approach.  And another approach with the same type-safety is to select from multiple input channels each with their own specific type of message but the approach was the best in the circumstances.

In this and subsequent posts I will explorer some of the hurdles I encountered, and the trade-offs between various solutions. To demonstrate the ideas I will create a demo using the example of a currency exchange server. This example is similar, though simpler, to the above-mentioned backend project which I cannot discuss here due to confidentiality (though I can say that it had nothing to do with currencies :).

Type Safe Channels

As I mentioned, the first hurdle was ensuring that the right messages are sent to the correct type of channel.  In my backend project there were seven different channel types each of which could receive one or more different message types.  (One channel type had more than 20 message types.)

Channels of Anything

A simple approach would have been to allow the channels to receive any type of value by using the empty interface type.

  var ch chan interface{}

Then, in the go-routine for the channel, I would detect an unexpected message type like this:

  for message := range ch {
    switch m := message.(type) {
    case ExpectedType:
      ...
    default:
      panic("Unexpected message type for channel ch")
    }
  }

Of course, this means that bugs aren't caught until run-time.  In the worst case they are found by some unsuspecting user, because the particular circumstance of their occurrence was not covered by your tests. I was not happy at all with this approach and if you think it's a good idea I recommend you go back to Javascript or Python.

Multiple Channels

A safer approach that I have seen elsewhere is to allow each go-routine to select from multiple channels with each channel having a different type.

  var chInt      chan int
  var chExpected chan ExpectedType

  for {
    select {
    m := <-chExpected:
      // handle ExpectedType message
    m := <-chInt:
      // handle int message
    }
  }


This ensures, at compile time, that only messages of the correct type are sent since only channels of the correct type are connected.

However, this solution is a bit cumbersome to maintain - eg when adding new message types. It is also wasteful of memory. For example, my backend could have more than 100,000 go-routines of a certain type each of which would require more than 20 (buffered) channels.  This would require several GBytes of memory more than a solution which only had one channel per go-routine.

I needed a way to restrict, at compile time, a single channel to receive a limited set of message types.

Inheritance?

In a more "OO" language I would just create a "base" class for each type of go-routine and all message types for that go-routine would inherit from this base class. In Go, if you could do this, it would look something like:

  // WARNING this is not real Go code
  type (
    baseObj1 struct{}
    obj1Mess1: baseObj1 struct{ /* fields for the message */ }
    obj1Mess2: baseObj1 struct{ /* fields for the message */ }

    baseObj2 struct{}
    obj2Mess1: baseObj2 struct{ /* fields for the message */ }
    ...
 )

  var chType1  chan baseObj1  // only obj1Mess1, obj1Mess2
  var chType2  chan baseObj2  // obj2Mess1 but not obj1Mess1, etc

You can't do this in Go. But, in a way, Go does support "inheritance" - using interfaces. (In fact, I have found that Go and interfaces are sufficient to implement any useful design pattern.)

And one problem with the above is that some message types are destined for more than one object. So some message types must be derived from more than one base class. This is not possible in languages, like Java and C#, that do not support multiple inheritance. However, these languages do allow you to "inherit" from more than one interface - which leads to something similar to what I came up with for Go...

Interfaces

It's pretty clear that the best solution was to use channels of different interface types. I searched on the Internet for Go examples that used this technique but found none. Hence I am writing this post to describe the solution I came up with.

In detail: I used one input channel per go-routine which could receive several different message types. This channel's type is an interface type specific to that channel. To only allow the correct messages to be sent to the channel only the allowed messages "implement" that interface.  This was amazingly effective in allowing me to write correct code in what was a complicated jumble of different channels. In fact using the auto-completion provided by my editor the code almost wrote itself.  Moreover, the maintainability of the code is improved.

The only downside is that the implementation looks a bit unusual. This is due to the way Go implements interfaces.  You don't explicitly say a type implements an interface; the compiler determines that a type implements an interface because it implements all functions of the interface (with correct parameters and returns values). To allow the compiler to enforce our type-safe channels we need to create an interface type (eg IProcessor below) with a dummy function (eg ImplementsProcessor) that all messages for that channel must implement.

In order to show this in action I include a simple application for conversion between any two different currencies.

Exchange Rates Example
WARNING: This code makes several assumptions about exchange rates that are unrealistic, as anyone who understands exchange rates would atest.

Here is a simple example using this technique.  It is a server that receives currency rates from external sources, stores them and can send notifications when the rate changes. This would allow, for example, a subscriber to track the exchange rate between any two currencies as demonstrated below.

EDIT: Here is a link to the full example on the Go Playground, so you can run and play around with it: Exchange Rates Example

Channels

The sample code uses three different types of channels:
  • Currency channel - receives updates to a currency + requests for a feed of rate updates
  • Main processor - tracks currencies & hooks up requests to the correct currency channel
  • Response channels where currency feeds are sent
As I said the types of each channel are of different interface types:

1. type ICurrency interface { ImplementsCurrency() }
2. type IProcessor interface { ImplementsProcessor() }
3. type IResponse interface { ImplementsResponse() }

Not that the functions (ImplementsCurrency() etc) don't do nothing other than indicate that a type is intended as a message to a particular type of channel. For example, the NewCurrency type is only intended to be sent to the main processor.

To indicate this it must have a dummy function called ImplementsProcessor.

type NewCurrency struct {
  ch   chan ICurrency // where rate changes are sent
  name string         // name of the currency
}

func (NewCurrency) ImplementsProcessor() {}

Messages

In this simple example there are only four different types of messages (structs): NewCurrency, NewRate, CurrencyRequest, Update.

As mentioned above, NewCurrency is sent to the main processor so it can track all the active currencies.  It just contains the name of the currency and the channel where the currency update messages (NewRate) are sent.

A NewRate message is sent to a currency process. This just says that the exchange rate of the currency has changed relative to some reference currency.  The reference currency could be anything - eg. current USD, USD at a certain date, a weighted index, etc - as long as all rates use the same one.  Note that the demo below just generates fictitious rate values but in a real application the values could be scraped from some web site or be sent directly from some authority.

A CurrencyRequest message is sent by a party who wants to subscribe to rate changes for a currency.  It supplies the name of the currency it wants to know about and a channel of IResponse where the changes to the currency are fed. Typically, the party would request at least 2 currencies so it knows the current exchange rate between the two when either changes.

Finally, an Update message is sent to any party who has subscribed to the currency.

Here are all the channel and message types:

type (
  IProcessor interface { ImplementsProcessor() }
  ICurrency interface { ImplementsCurrency() }
  IResponse interface { ImplementsResponse() }

  // NewCurrency - new currency available
  NewCurrency struct {
    ch   chan ICurrency // where rate changes are sent
    name string         // name of the currency
  }

  // CurrencyRequest asks for updates on a currency's changes
  CurrencyRequest struct {
    name string         // name of currency we want rates for
    ch   chan IResponse // where updates are sent
  }

  // NewRate - currency rate has changed
  NewRate struct {
    rate float64 // new rate (relative to ref. currency)
  }

  // Update - response sent whenever the currency rate changes
  Update struct {
    name string
    rate float64
  }
)

// ====== IProcessor messages ======
func (NewCurrency) ImplementsProcessor()     {}
func (CurrencyRequest) ImplementsProcessor() {}

// ====== ICurrency messages ======
func (NewRate) ImplementsCurrency()          {}
func (CurrencyRequest) ImplementsCurrency()  {}

// ====== IResponse messages ======
func (Update) ImplementsResponse()           {}

Processing

Processing is easy to understand in this simple example but I will briefly explain.  There is one "main processor" that starsts up a go-routine to keep track of all currencies by receiving NewCurrency messages.  It also forwards CurrencyRequest to the relevant currency processor.

Each currency has its own currency processor which receives messages through its ICurrency channel.  The NewRate message simply informs the processor that its rate has changed. The CurrencyRequest message requests that currency updates are sent for a currency whenever it changes - it supplies a IResponse channel where the exchange rate updates are sent.

// processorCh (singleton) where processor requests are sent
var processorCh chan IProcessor

func GetMainProcessor() chan<- IProcessor {
  if processorCh == nil {
    processorCh = make(chan IProcessor, 100)

    // Start main processing
    go func() {
      currencies := make(map[string]*NewCurrency)

      for m := range processorCh {
        switch message := m.(type) {
        case CurrencyRequest:
          // Forward the request to the specific currency handler
          if pCurrency, ok := currencies[message.name]; ok {
            pCurrency.ch <- message
          }

        case NewCurrency:
          currencies[message.name] = &message

          // Start processing of this currency
          go func() {
            c := &message
            feeds := make(map[chan IResponse]struct{})
            rate := 0.0
            for message := range c.ch {
              switch message := message.(type) {
              case NewRate: // new rate against ref. currency
                rate = message.rate
                for feed := range feeds {
                  feed <- Update{name: c.name, rate: rate}
                }

              case CurrencyRequest:
                feeds[message.ch] = struct{}{}
                if rate > 0 {
                  message.ch <- Update{name: c.name, rate: rate}
                }
              }
            }
          }()
        }
      }
    }()
  }
  return processorCh
}

Test Harness

Finally, we have some code to demonstrate the system.  This uses the startRateFeed() function to create a new currency channel and continually send random rate changes to it.  The main functions creates some currencies and an IResponse channel and asks the main processor to send updates for USD and AUD.  It then displays the current exchange rate when either of the currencies moves.

// startRateFeed simulates rate updates from an external source
func startRateFeed(name string, initial float64, 
                   delay time.Duration) {
  // Set up fake currency
  ch := make(chan ICurrency, 10)
  GetMainProcessor() <- NewCurrency{
    ch:   ch,
    name: name,
  }
  go func() {
    rate := initial
    for {
      // Randomly vary the rate by up to ±0.001 currency units
      rate += float64(rand.Intn(21)-10) / 10000
      if rate <= 0 {
        rate = 0.01 // ensure we stay above zero
      }
      ch <- NewRate{rate: rate}
      time.Sleep(delay)
    }
  }()
}

func main() {
  mp := GetMainProcessor()

  // Set up fake inputs for a few currencies
  startRateFeed("AUD", 0.72, 7*time.Second)
  startRateFeed("USD", 1.0, 3*time.Second)
  startRateFeed("NZD", 0.65, 20*time.Second)

  // Set up conversion for AUD to USD
  currency1 := "AUD"
  currency2 := "USD"
  chReply := make(chan IResponse) // where updates are sent
  mp <- CurrencyRequest{name: currency1, ch: chReply}
  mp <- CurrencyRequest{name: currency2, ch: chReply}

  // Loop getting updates and showing the conversion 
  // rate when either currency changes
  var rate1, rate2 float64

  for m := range chReply {
    update := m.(Update)
    switch update.name {
    case currency1:
      rate1 = update.rate
    case currency2:
      rate2 = update.rate
    }

    fmt.Printf("1 %s is equal to %.3f %s\n", 
               currency1, rate1/rate2, currency2)
  }
}


Conclusion

 Type safety is one of the great things about Go but I have seen a lot of code which loses that benefit by using general interfaces (interface {}). I hope I have shown that you may be able to use more specific interfaces (of channels in this case) for improved type safety.

In my next post I will enhance the above example to demonstrate some further problems and techniques.

One problem I did not have time to address is that declaring a method for a type automatically also works for a pointer to that type.  I discovered this when I accidentally pass a pointer to a message type rather than the message type itself which caused a confusing bug.

Another problem, not specific to my technique, is to do with shutting down channels with multiple writers while avoiding writing to a closed channel.