I was doing some work with a colleague earlier this week which involved connecting to an internal RabbitMQ broker and transforming some messages before forwarding them to our Kafka broker.
We’re using langohr to connect to RabbitMQ. Its consumer and queue documentation shows how to use the subscribe
function to connect to a broker and print messages that arrive:
The example above is pretty close to what we started working with earlier today. It’s also quite similar to a lot of other code I’ve written in the past: connect to a broker or service and provide a block/function to be called when something interesting happens.
Sequences, not handlers
Although there’s nothing wrong with this I think there’s a nicer way: flip the responsibility so instead of the subscriber pushing to our handler function we consume it through Clojure’s sequence abstraction.
This is the approach I took when I wrote clj-kafka, a Clojure library to interact with LinkedIn’s Kafka (as an aside, Kafka is really cool- I’m planning a blog post on how we’ve been building a new data platform for uSwitch.com but it’s well worth checking out).
Here’s a little example of consuming messages through a sequence that’s taken from the clj-kafka README:
We create our consumer and access messages through a sequence abstraction by calling messages
with the topic we wish to consume from.
The advantage of exposing the items through a sequence is that it becomes instantly composable with the many functions that already exist within Clojure: map
, filter
, remove
etc.
In my experience, when writing consumption code that uses handler functions/callbacks I’ve ended up with code that looks like this:
It makes consuming data more complicated and pulls more complexity into the handler function than necessary.
Push to Pull
This is all made possible thanks to a lovely function written by Christophe Grande:
The function returns a vector containing 2 important parts: the sequence, and a function to put things into that sequence.
Returning to our original RabbitMQ example, we can change the subscriber code to use pipe
to return the sequence that accesses the queue of messages:
We can then map
, filter
and more.
We pull responsibility out of the handler function and into the consumption of the sequence. This is really important, and it compliments something else which I’ve recently noticed myself doing more often.
In the handler function above I convert the function parameters to a map containing :payload
, :ch
and :msg-meta
. In our actual application we’re only concerned with reading the message payload and converting it from a JSON string to a Clojure map.
Initially, we started writing something similar to this:
We have a function that exposes the messages through a sequence, but we pass a kind of transformation function as the last argument to subscriber-seq
. This initially felt ok: subscriber-seq
calls our handler and extracts the payload into our desired representation before putting it into the queue that backs the sequence.
But we’re pushing more responsibility into subscriber-seq
than needs to be there.
We’re just extracting and transforming messages as they appear in the sequence so we can and should be building upon Clojure's existing functions: map and the like. The code below feels much better:
It feels better for a similar reason as moving the handler to a sequence- we’re making our function less complex and encouraging the composition through the many functions that already exist. Line 13 is a great example of this for me- map
’ing a composite function to transform the incoming data rather than adding more work into subscriber-seq
.
Pipe
I’ve probably used Christophe’s pipe
function 3 or 4 times this year to take code that started with handler functions and evolved it to deal with sequences. I think it’s a really neat way of making callback-based APIs more elegant.