Clojure - From Callbacks to Sequences

I was doing some work with a colleague earlier this week which involved connecting to an internal RabbitMQ broker and transforming some messages before forwarding them to our Kafka broker.

We’re using langohr to connect to RabbitMQ. Its consumer and queue documentation shows how to use the subscribe function to connect to a broker and print messages that arrive:

The example above is pretty close to what we started working with earlier today. It’s also quite similar to a lot of other code I’ve written in the past: connect to a broker or service and provide a block/function to be called when something interesting happens.

Sequences, not handlers

Although there’s nothing wrong with this I think there’s a nicer way: flip the responsibility so instead of the subscriber pushing to our handler function we consume it through Clojure’s sequence abstraction.

This is the approach I took when I wrote clj-kafka, a Clojure library to interact with LinkedIn’s Kafka (as an aside, Kafka is really cool- I’m planning a blog post on how we’ve been building a new data platform for uSwitch.com but it’s well worth checking out).

Here’s a little example of consuming messages through a sequence that’s taken from the clj-kafka README:

We create our consumer and access messages through a sequence abstraction by calling messages with the topic we wish to consume from.

The advantage of exposing the items through a sequence is that it becomes instantly composable with the many functions that already exist within Clojure: map, filter, remove etc.

In my experience, when writing consumption code that uses handler functions/callbacks I’ve ended up with code that looks like this:

It makes consuming data more complicated and pulls more complexity into the handler function than necessary.

Push to Pull

This is all made possible thanks to a lovely function written by Christophe Grande:

The function returns a vector containing 2 important parts: the sequence, and a function to put things into that sequence.

Returning to our original RabbitMQ example, we can change the subscriber code to use pipe to return the sequence that accesses the queue of messages:

We can then map, filter and more.

We pull responsibility out of the handler function and into the consumption of the sequence. This is really important, and it compliments something else which I’ve recently noticed myself doing more often.

In the handler function above I convert the function parameters to a map containing :payload, :ch and :msg-meta. In our actual application we’re only concerned with reading the message payload and converting it from a JSON string to a Clojure map.

Initially, we started writing something similar to this:

We have a function that exposes the messages through a sequence, but we pass a kind of transformation function as the last argument to subscriber-seq. This initially felt ok: subscriber-seq calls our handler and extracts the payload into our desired representation before putting it into the queue that backs the sequence.

But we’re pushing more responsibility into subscriber-seq than needs to be there.

We’re just extracting and transforming messages as they appear in the sequence so we can and should be building upon Clojure's existing functions: map and the like. The code below feels much better:

It feels better for a similar reason as moving the handler to a sequence- we’re making our function less complex and encouraging the composition through the many functions that already exist. Line 13 is a great example of this for me- map’ing a composite function to transform the incoming data rather than adding more work into subscriber-seq.

Pipe

I’ve probably used Christophe’s pipe function 3 or 4 times this year to take code that started with handler functions and evolved it to deal with sequences. I think it’s a really neat way of making callback-based APIs more elegant.

6 responses
Thank you so much for this post! This is a problem that I have been spending hammock time on over the past while. Is there an equivalent function to 'pipe' in clojurescript? Thanks again.
Hi Matt,

I'm not particularly familiar with ClojureScript but Christophe did have an earlier post that implemented the same thing but using atoms- that may work? He posted it here: http://clj-me.cgrand.net/2009/11/18/are-pipe-dreams-made-of-promises/

I'd be interested to hear how it goes! Glad you found the post useful.

How does this handle error handling? What happens if:

- we call something that builds up a lazy sequence (for a while)
- before realizing the seq, the connection closes

I'd guess that the code that realises the seq would throw a connection error :(

Be careful as you might run into troubles with what I consider a bug in Clojure: http://dev.clojure.org/jira/browse/CLJ-1119. If you have two consumer threads on your sequence and interrupt one of them at an unfortunate moment, it'll take down the other one, too. Similar considerations with interrupting a consumer and restarting it where it left off. Email me for details.
@tcrayford good question! the sequence is backed by Java's LinkedBlockingQueue which should allow you to isolate production and consumption of the sequence. I've not looked into it with the Rabbit client but the Kafka consumer will handle most connection/transport errors- the consumer will block while there are no messages in the queue, the production thread(s) continue to retry and then start pushing messages back into the queue when they can.

@hank- thanks for that, wasn't something i'd come across- good to know

Robert Rees upvoted this post.