Introducing Bifrost: Archive Kafka data to Amazon S3

We're happy to announce the public release of a tool we've been using in production for a while now: Bifrost

We use Bifrost to incrementally archive all our Kafka data into Amazon S3; these transaction logs can then be ingested into our streaming data pipeline (we only need to use the archived files occasionally when we radically change our computation).

Rationale

There are a few other projects that scratch the same itch: notably Secor from Pinterest and Kafka's old hadoop-consumer. Although Secor doesn't rely on running Hadoop jobs, it still uses Hadoop's SequenceFile file format; sequence files allow compression and distributed splitting operations, as well as just letting you access the data in a record-oriented way. We'd been using code similar to Kafka's hadoop-consumer for a long time but it was running slowly and didn't do a great job of watching for new topics and partitions.

We wanted something that didn't introduce the cascade of Hadoop dependencies and would be able to run a little more hands-off.

Bifrost

Bifrost is the tool we wanted. It has a handful of configuration options that need to be set (e.g. S3 credentials, Kafka ZooKeeper consumer properties) and will continually monitor Kafka for new topics/partitions and create consumers to archive the data to S3. 

Here's an example configuration file:

Bifrost is written in Clojure and can be built using Leiningen. If you want to try it locally you can just `lein run`, or, for production you can build an uberjar and run:


Data is stored in "directories" in S3: s3://<bucket>/<kafka-consumer-group-id>/<topic>/partition=<partition-id>/0000000.baldr.gz (the filename is the starting offset for that file.

We use our Baldr file format: the first 8-bytes indicate the length of the record, then the following n-bytes represent the record itself, then another 8-byte record length etc. It provides almost all of what we need from SequenceFiles but without the Hadoop dependencies. We have a Clojure implementation but it should be trivial to write in any language. We also compress the whole file output stream with Gzip to speed up uploads and reduce the amount we store on S3.

Happy Kafka archiving!

Kafka for uSwitch's Event Pipeline

Kafka is a high-throughput, persistent, distributed messaging system that was originally developed at LinkedIn. It forms the backbone of uSwitch.com’s new data analytics pipeline and this post will cover a little about Kafka and how we’re using it.

Kafka is both performant and durable. To make it easier to achieve high throughput on a single node it also does away with lots of stuff message brokers ordinarily provide (making it a simpler distributed messaging system).

Messaging

Over the past 2 years we’ve migrated from a monolithic environment based around Microsoft .NET and SQL Server to a mix of databases, applications and services. These change over time: applications and servers will come and go.

This diversity is great for productivity but has made data analytics as a whole more difficult.

We use Kafka to make it easier for the assortment of micro-applications and services, that compose to form uSwitch.com, to exchange and publish data.

Messaging helps us decouple the parts of the infrastructure letting consumers and producers evolve and grow over time with less centralised coordination or control; I’ve referred to this as building a Data Ecosystem before.

Kafka lets us consume data in realtime (so we can build reactive tools and products) and provides a unified way of getting data into long-term storage (HDFS).

Consumers and producers

Kafka’s model is pretty general; messages are published onto topics by producers, stored on disk and made available to consumers. It’s important to note that messages are pulled by consumers to avoid needing any complex throttling in the event of slow consumption.

Kafka doesn’t dictate any serialisation it just expects a payload of byte[]. We’re using Protocol Buffers for most of our topics to make it easier to evolve schemas over time. Having a repository of definitions has also made it slightly easier for teams to see what events they can publish and what they can consume.

This is what it looks like in Clojure code using clj-kafka.

We use messages to record the products that are shown across our site, the searches that people perform, emails that are sent (and bounced), web requests and more. In total it’s probably a few million messages a day.

Metadata and State

Kafka uses Zookeeper for various bits of meta-information, including tracking which messages have already been retrieved by a consumer. To that end, it is the consumers responsibility to track consumption- not the broker. Kafka’s client library already contains a Zookeeper consumer that will track the message offsets that have been consumed.

As an side, the broker keeps no state about any of the consumers directly. This keeps it simple and means that there’s no need for complex structures kept in memory reducing the need for garbage collections.

When messages are received they are written to a log file (well, handed off to the OS to write) named after the topic; these are serial append files so individual writes don’t need to block or interfere with each other.

When reading messages consumers simply access the file and read data from it. It’s possible to perform parallel consumption through partitioned topics although this isn’t something we’ve needed yet.

Topic and message storage

Messages are tracked by their offset- letting consumers access from a given point into the topic. A consumer can connect and ask for all messages that Kafka has stored currently, or from a specified offset. This relatively long retention (compared to other messaging systems) makes Kafka extremely useful to support both real-time and batch reads. Further, because it takes advantage of disk throughput it makes it a cost-effective system too.

The broker can be configured to keep messages up to a specified quantity or for a set period of time. Our broker is configured to keep messages for up to 20 days, after that and you’ll need to go elsehwere (most topics are stored on HDFS afterwards). This characteristic that has made it so useful for us- it makes getting data out of applications and servers and into other systems much easier, and more reliable, than periodically aggregating log files.

Performance

Kafka’s performance (and the design that achieves it) is derived from the observation that disk throughput has outpaced latency; it writes and reads sequentially and uses the operating system’s file system caches rather than trying to maintain its own- minimising the JVM working set, and again, avoiding garbage collections.

The plot below shows results published within an ACM article; their experiment was to measure how quickly they could read 4-byte values sequentially and randomly from different storage.

Performance

Please note the scale is logarithmic because the difference between random and sequential is so large for both SSD and spinning disks.

Interestingly, it shows that sequential disk access, spinning or SSD, is faster than random memory access. It also shows that, in their tests, sequential spinning disk performance was higher than SSD.

In short, using sequential reads lets Kafka get performance close to random memory access. And, by keeping very little in the way of metadata, the broker can be extremely lightweight.

If anyone is interested, the Kafka design document is very interesting and accessible.

Batch Load into HDFS

As I mentioned earlier, most topics are stored on HDFS so that we can maximise the amount of analysis we can perform over time.

We use a Hadoop job that is derived from the code included within the Kafka distribution.

The process looks a little like this:

Hadoop Loading

Each topic has a directory on HDFS that contains 2 further subtrees: these contain offset token files and data files. The input to the Hadoop job is an offset token file which contains the details of the broker to consume from, the message offset to read from, and the name of the topic. Although it’s a SequenceFile the value bytes contain a string that looks like this:

broker.host.com topic-name  102991

The job uses a RecordReader that connects to the Kafka broker and passes the message payload directly through to the mapper. Most of the time the mapper will just write the whole message bytes directly out which is then written using Hadoop’s SequenceFileOutputFormat (so we can compress and split the data for higher-volume topics) and Hadoop’s MultipleOutputs so we can write out 2 files- the data file and a newly updated offset token file.

For example, if we run the job and consume from offset 102991 to offset 918280, this will be written to the offset token file:

broker.host.com topic-name  918280

Note that the contents of the file is exactly the same as before just with the offset updated. All the state necessary to perform incremental loads is managed by the offset token files.

This ensures that the next time the job runs we can incrementally load only the new messages. If we introduce a bug into the Hadoop load job we can just delete one or more of the token files to cause the job to load from further back in time.

Again, Kafka’s inherent persistence makes dealing with these kinds of HDFS loads much easier than dealing with polling for logs. Previously we’d used other databases to store metadata about the daily rotated logs we’d pulled but there was lots of additional computation in splitting apart files that would span days- incremental loads with Kafka are infinitely cleaner and efficient.

Kafka has helped us both simplify our data collection infrastructure, letting us evolve and grow it more flexibly, and provided the basis for building real-time systems. It’s extremely simple and very easy to setup and configure, I’d highly recommend it for anyone playing in a similar space.

Related Stuff

As I publish this LinkedIn have just announced the release of Camus: their Kafka to HDFS pipeline. The pipeline I’ve described above was inspired by the early Hadoop support within Kafka but has since evolved into something specific for use at uSwitch.

Twitter also just published about their use of Kafka and Storm to provide real-time search.

I can also recommend reading “The Unified Logging Infrastructure for Data Analytics at Twitter” paper that was published late last year.

Finally, this post was based on a brief presentation I gave internally in May last year: Kafka a Little Introduction

.

Clojure - From Callbacks to Sequences

I was doing some work with a colleague earlier this week which involved connecting to an internal RabbitMQ broker and transforming some messages before forwarding them to our Kafka broker.

We’re using langohr to connect to RabbitMQ. Its consumer and queue documentation shows how to use the subscribe function to connect to a broker and print messages that arrive:

The example above is pretty close to what we started working with earlier today. It’s also quite similar to a lot of other code I’ve written in the past: connect to a broker or service and provide a block/function to be called when something interesting happens.

Sequences, not handlers

Although there’s nothing wrong with this I think there’s a nicer way: flip the responsibility so instead of the subscriber pushing to our handler function we consume it through Clojure’s sequence abstraction.

This is the approach I took when I wrote clj-kafka, a Clojure library to interact with LinkedIn’s Kafka (as an aside, Kafka is really cool- I’m planning a blog post on how we’ve been building a new data platform for uSwitch.com but it’s well worth checking out).

Here’s a little example of consuming messages through a sequence that’s taken from the clj-kafka README:

We create our consumer and access messages through a sequence abstraction by calling messages with the topic we wish to consume from.

The advantage of exposing the items through a sequence is that it becomes instantly composable with the many functions that already exist within Clojure: map, filter, remove etc.

In my experience, when writing consumption code that uses handler functions/callbacks I’ve ended up with code that looks like this:

It makes consuming data more complicated and pulls more complexity into the handler function than necessary.

Push to Pull

This is all made possible thanks to a lovely function written by Christophe Grande:

The function returns a vector containing 2 important parts: the sequence, and a function to put things into that sequence.

Returning to our original RabbitMQ example, we can change the subscriber code to use pipe to return the sequence that accesses the queue of messages:

We can then map, filter and more.

We pull responsibility out of the handler function and into the consumption of the sequence. This is really important, and it compliments something else which I’ve recently noticed myself doing more often.

In the handler function above I convert the function parameters to a map containing :payload, :ch and :msg-meta. In our actual application we’re only concerned with reading the message payload and converting it from a JSON string to a Clojure map.

Initially, we started writing something similar to this:

We have a function that exposes the messages through a sequence, but we pass a kind of transformation function as the last argument to subscriber-seq. This initially felt ok: subscriber-seq calls our handler and extracts the payload into our desired representation before putting it into the queue that backs the sequence.

But we’re pushing more responsibility into subscriber-seq than needs to be there.

We’re just extracting and transforming messages as they appear in the sequence so we can and should be building upon Clojure's existing functions: map and the like. The code below feels much better:

It feels better for a similar reason as moving the handler to a sequence- we’re making our function less complex and encouraging the composition through the many functions that already exist. Line 13 is a great example of this for me- map’ing a composite function to transform the incoming data rather than adding more work into subscriber-seq.

Pipe

I’ve probably used Christophe’s pipe function 3 or 4 times this year to take code that started with handler functions and evolved it to deal with sequences. I think it’s a really neat way of making callback-based APIs more elegant.