Multiplexing work reliably and efficiently with state machines and core.async

This all started with a tweet and brief exchange with a friend and ex-colleague:

This post describes some work we did recently that I’m pretty happy with: we model the execution of independent pieces of work as state machines which are executed concurrently by multiple core.async processes with state communicated over channels. 

Modeling with state machines helps flatten call chain complexity and makes retrying/recovering from error states trivial: we just try to apply the same transition to the same state again.

In short, we've improved both throughput and reliability.

Context

Specifically our problem was:

  1. Connect to a reporting API and download a report
  2. Transform the report, converting some values between units, currencies etc.
  3. Write the report out to S3 (ultimately to be ingested into our Redshift cluster)

It’s a pretty simple problem but when we’re downloading thousands of reports every day its likely that we’ll come across intermittent problems; mostly network errors connecting to the APIs or we’ll be rate throttled etc.

We started simple, making the most of the environment our system ran in.

Our original code was similar to this:

The reporting API lets us download aggregate information for a day and client.

Ensuring processes completed was the responsibility of a supervisor process. Although this was beautifully simple for the incremental work it was incredibly inefficient when running large imports:

  1. Our unit of work was all steps needed to download, process and upload a report. If any step failed we could only retry the whole.
  2. Even worse, if we were processing hundreds or thousands of reports together any failure would terminate and prevent all subsequent reports. We could unpick progress and change some command-line options to avoid doing too much again but it's painful.
  3. Handling errors was slow and painful. If our download request was rate limited we'd have to back-off; operations were globally serial though so any delay sleeps everything.

State machines, core.async and concurrent execution

Instead of weaving function invocations together we can model the problem as a set of independent state machines- each moving independently. 

Our transitions are pretty similar to the list we mentioned at the beginning: :downloadable -> :uploadable -> :completed. Each report (a combination of client and day) will progress from :downloadable to :completed

The local order of these operations is important (we can’t upload the report before we’ve downloaded it) but the global order isn’t important- it doesn’t matter whose report we process first. It's also important to note that our operations are idempotent- it also doesn't matter if we download/try to download the same report multiple times, likewise with the upload.

At each step our code will use the machine’s :state to determine which transition to apply. If we encounter an error whilst performing a transition we attach :error to the state with the exception, letting the machine retry the operation.

Our final code looks pretty close to the following:

  1. We create the states-ch channel to communicate each state of each machine (i.e. our unit of work).
  2. Processes are started to progress each of the state machines.
  3. Once a machine’s :state is :completed the final state is put on the completed channel (this helps us know when all work is finished).
  4. States are read from the states-ch channel and the appropriate operation performed. The result of each operation is the new state. We use thread to perform the operation and return a channel we can read the result from. 
  5. If an operation causes an exception to be raised it’s caught and we associate the exception value to the state map. After a small delay we put the state map back into the states channel for the operation to be attempted again.

Modeling the problem with state machines and implementing it with core.async gives a few nice properties:

  1. Operations are transparent. It’s easy to see what’s going on at any point in time.
  2. Failure is isolated and easily retryable: all values needed to perform an operation are held in the state maps so its really just a case of applying (step state) again.
  3. We use core.async’s timeout channel to defer the retry operation, letting us switch to a different op first.
  4. Overall throughput is increased. If we need to defer an operation we proceed with something else. Even on my 4-core laptop it results in ~6x greater throughput.

In short, we’re able to process many more reports concurrently than we were able to with our initial ‘dumb’ implementation and the code, I think, is vastly more readable than the nested error handling we used to have.