Async counter updates in a global rate limiter

For a while, I've known that my tool Sangrenel (used for load testing Kafka) had some inefficiencies. Particularly the global counter. Sangrenel effectively fires up many workers that generate random messages. A global counter is used to periodically dump the rate of message generation in addition to controlling a global rate limiter (to impose a limit on the aggregate throughput of all workers). Since this single resource receives a significant amount of concurrent updates, it's obviously not hard to make it crumble– except that I never really experienced it on binaries built with Go 1.3 and 32 thread machines. I was able to generate more than enough messages/sec. than I needed for my use cases.

This appears to have changed, however. Between a binary built on Go 1.4 (haven't investigated exactly what changed) and running 36 workers on the new AWS c4.8xl, it looks like things grind to a halt around 400K ops/sec. Which, is a a bit lower than numbers I used to see:

img

You can see that the cores aren't able to be fully utilized, but are tasked with work easily capable of doing so (producing random message strings from a pre-defined list of characters). I was able to skip profiling because I already knew where the bottleneck was; disabling the counter update calls anyway quickly confirmed it to be the source.

Let's talk about what kind of counter I'm using: It's a channel. Admittedly, this is weird, but Sangrel is also the first thing that I wrote in Go. At some point, someone suggested using a channel as a counter and it just so happened to fit very easily into the design. It serves functionality for both message rate reporting and global rate limiting across many Goroutines.

Essentially, we have a buffered channel with a capacity of 1:

sentCntr = make(chan int64, 1)  

Pre-load a zero value in init:

func init() {  
    sentCntr <- 0
}

And provide some functions to update / fetch the value in the channel buffer:

func incrSent() {  
    i := <-sentCntr
    sentCntr <- i++
}

func fetchSent() int64 {  
    i := <-sentCntr
    sentCntr <- i
    return i
}

Using a simplified code example, we have be many parallel Goroutines each running a loop generating messages, sending to Kafka, then updating the counter:

for {  
    randMsg(msg, *generator)
    err = producer.SendMessage(topic, nil, kafka.ByteEncoder(msg))
    if err == nil {
        incrSent()
    }
}

We periodically call fetchSent() to report deltas over time (as message/sec. rates) in addition to performing a per-worker, per-iteration lookup for rate limiting (more on that later). The problem starts with doing millions of operations a second across many concurrent tasks; the basic mechanics of shared resources guarded with mutexes is showing its cost.

But what can we do about it?

Batches, everywhere

The app is capping at less than 400K ops/sec. with lots unutilized CPU capacity. The immediate answer I came up with was to simply reduce channel ops. I did this by slapping in a local accumulator to each Goroutine and flushing the value to the global counter asynchronously and at a fixed interval, rather than at every iteration.

We modify incrSent() to receive an increment value:

func incrSent(n int64) {  
    i := <-sentCntr
    sentCntr <- i + n
}

Then use a select statement on a ticker that triggers a call to incrSent() with the locally accumulated value every 50ms:

tick := time.Tick(50 * time.Millisecond)  
var n int64

for {  
    randMsg(msg, *generator)
    err = producer.SendMessage(topic, nil, kafka.ByteEncoder(msg))
    if err == nil {
        n++
        select {
        case <-tick:
            incrSent(n)
            n = 0
        default:
            break
        }
    }
}

The results of this simple change is immediately apparent and tremendous. We go from being able to generate 400K messages a second to 2.8 million a second, in addition to more entertaining core utilization:

img

A new problem appears

Easy enough, right? This introduces a subtle problem that I anticipated but was able to test using the rate limiting feature built into my app. The problem is that we have many parallel workers generating an aggregate message rate at the order of millions a second, but only write updates to the global counter every 50ms. If every task were running in perfect lockstep, a call to fetchSent() at 49ms after the last counter update could be off by as much as 140,000.

Since we're using fetchSent() in the global rate limiter, this means rate limiting will be off. Let's expand beyond the worker loop code and look the rate limiter logic.

The message generator / sender loop is nested inside an outer control loop. The outer loop captures a start time and kicks off the inner loop. The inner loop will generate and send messages as fast as possible, checking the global message delta since the start time. If the inner loop delta meets (or exceeds) a configured threshold (in this mock code, 10,000), it will break and the outer loop will sleep for the remainder of that second. This effectively limits all workers to an aggregate, per-second rate (this method is prone to jitter, but works out quite smoothly after everything is fired over the network and traverses several buffers / layers outside the code anyway).

for {  
    rateEnd := time.Now().Add(time.Second)
    countStart := fetchSent()
    rateLimit := 10000

    tick := time.Tick(50 * time.Millisecond)
    var n int64

    for fetchSent()-countStart < rateLimit {
        randMsg(msg, *generator)
        err = producer.SendMessage(topic, nil, kafka.ByteEncoder(msg))
        if err == nil {
            n++
            select {
            case <-tick:
                incrSent(n)
                n = 0
            default:
                break
            }
        }
    }

    time.Sleep(rateEnd.Sub(time.Now()))
}

In the actual Sangrenel service, a -rate flag allows a user to pass in a custom rate value. When Sangrenel was tested with 50ms counter updates, the rate limiter functionality was noticeably off:

$ ./sangrenel -workers=1 -rate=8000

::: Sangrenel :::
Starting 1 workers
Message size 300 bytes

client_1 connected
2015-02-24T03:20:28Z Generating 19Mb/sec @ 8197 messages/sec | topic: sangrenel | 0.87ms avg latency
2015-02-24T03:20:33Z Generating 18Mb/sec @ 7987 messages/sec | topic: sangrenel | 1.10ms avg latency
2015-02-24T03:20:38Z Generating 19Mb/sec @ 8463 messages/sec | topic: sangrenel | 1.06ms avg latency
2015-02-24T03:20:43Z Generating 19Mb/sec @ 8230 messages/sec | topic: sangrenel | 0.83ms avg latency

While not massively inaccurate, this discrepancy can easily be amplified in higher concurrency / throughput configurations (unfortunately I didn't have a spare / non-production Kafka cluster laying around for testing at higher throughput).

After simply dropping the update interval to 10ms, the rate limiting accuracy improved:

...
tick := time.Tick(10 * time.Millisecond)  
...

Re-running the test:

$ ./sangrenel -workers=1 -rate=8000

::: Sangrenel :::
Starting 1 workers
Message size 300 bytes

client_1 connected
2015-02-24T03:19:22Z Generating 18Mb/sec @ 8032 messages/sec | topic: sangrenel | 1.13ms avg latency
2015-02-24T03:19:27Z Generating 18Mb/sec @ 8052 messages/sec | topic: sangrenel | 0.81ms avg latency
2015-02-24T03:19:32Z Generating 18Mb/sec @ 8044 messages/sec | topic: sangrenel | 0.91ms avg latency
2015-02-24T03:19:37Z Generating 18Mb/sec @ 8065 messages/sec | topic: sangrenel | 0.83ms avg latency

By simply using batched counter updates and a bit of update delay tweaking, the tool is able to maintain both peak message production and accurate rate controls.