Sinking Data to Neo4j from Hadoop with Cascading

Recently, I worked with a colleague (Paul Lam, aka @Quantisan on building a connector library to let Cascading interoperate with Neo4j: cascading.neo4j. Paul had been experimenting with Neo4j and Cypher to explore our data through graphs and we wanted an easy way to flow our existing data on Hadoop into Neo4j.

The data processing pipeline we’ve been growing at uSwitch.com is built around Cascalog, Hive, Hadoop and Kafka.

Once the data has been aggregated and stored a lot of our ETL is performed upon Cascalog and, by extension, Cascading. Querying/analysis is a mix of Cascalog and Hive. This layer is built upon our long-term data storage system: Hadoop; this, all combined, lets us store high-resolution data immutably at a much lower cost than uSwitch’s previous platform.

Cascading is:

application framework for Java developers to quickly and easily develop robust Data Analytics and Data Management applications on Apache Hadoop

Cascading provides a model on top of Hadoop’s very file-oriented, raw MapReduce API. It models the world around flows of data (Tuples), between Taps and according to Schemes.

For example, in Hadoop you might configure a job that reads data from a SequenceFile at a given location on HDFS. Cascading separates the 2 into slightly different concerns- the Tap would represent file storage on HDFS, the Scheme would be configured to read data according to the SequenceFile format.

Cascading provides a lot of classes to make it easier to build flows that join and aggregate data without you needing to write lots of boiler-plate MapReduce code.

Here’s an example of writing a Cascading flow (taken from the README of our cascading.neo4j library). It reads data from a delimited file (representing the Neo4j Nodes we want to create), and flows every record to our Neo4j Tap.

Although we use Cascading, really the majority of that is through Cascalog- a Clojure library that provides a datalog like language for analysing our data.

We wrote a small test flow we could use whilst testing our connector, a similar Cascalog example for the Cascading code above looks like this:

Our library, cascading.neo4j (hosted on Conjars.org) provides a Tap and Scheme suitable for sinking (writing data) to a Neo4j server using their REST connector. We extend and implement Cascading classes and interfaces making it easy to then integrate into our Cascalog processing.

cascading.neo4j allows you to create nodes, set properties on those node, add nodes to an exact match index, and create relationships (again with properties) between those nodes.

I should point out that cascading.neo4j may not be suitable for all batch processing purposes: most significantly, its pretty slow. Our production Neo4j server (on our internal virtualised infrastructure) lets us sink around 20,000 nodes per minute through the REST api. This is certainly a lot slower than Neo4j’s Batch Insert API and may make it unusable in some situations.

However, if the connector is fast enough for you it means you can sink data directly to Neo4j from your existing Cascading flows.

Performance

Whilst tweaking and tuning the connector we ran through Neo4j’s Linux Performance Guide (a great piece of technical documentation) that helped us boost performance a fair bit.

We also noticed the REST library allows for transactions to hold batch operations- to include multiple mutations in the same roundtrip. Our Neo4j RecordWriter will chunk batches- rather than writing all records in one go, you can specify the size.

We ran some tests and, on our infrastructure, using batch sizes of around 15 and 13 reducers (that is 13 ‘connections’ to our Neo4j REST api) yield the best performance of around 20,000 nodes per minute. We collected some numbers and Paul suggested we could have some fun putting those through a regression which will be the subject of my next post :)

Next steps

It’s currently still evolving a little and there’s a bit of duplicate code between the Hadoop and Local sections of the code. The biggest restrictions are it currently only supports sinking (writing) data and it’s speed may make it unsuitable for flowing very large graphs.

Hopefully this will be useful for some people and Paul and I would love pull requests for our little project.

Using Clojure Protocols for Java Interop

Clojure's protocols were a feature I'd not used much of until I started working on a little pet project: clj-hector (a library for accessing the Cassandra datastore).

Protocols provide a means for attaching type-based polymorphic functions to types. In that regard they're somewhat similar to Java's interfaces (indeed Java can interoperate with Clojure through the protocol-generated types although it's not something I've tried). Here's the example of their use from the Clojure website:

Protocols go deeper though: they can be used with already defined types (think attaching behaviour through mixing modules into objects with Ruby's include). It's for this reason that they've been a particularly nice abstraction for building adapters upon: glue that translates Java objects to Clojure structures when doing interop.

Here's a snippet from a bit of the code that converts Hector query results into maps:

QueryResultImpl contains results for queries. In one instance, this is RowsImpl which contains Row results (which in turn are converted by other parts of the extend-protocol statement in the real code). The really cool part of this is that dispatch happens polymorphically: there's no need to write code to check types, dispatch (and call recursively down the tree). Instead, you map declaratively by type and build a set of statements about how to convert. Clojure does the rest.

Pretty neat.

Processing null character terminated text files in Hadoop with a NullByteTextInputFormat

It was Forward's first hackday last Friday. Tom Hall and I worked on some collaborative filtering using Mahout and Hadoop to process some data collected from Twitter.

I'd been collecting statuses through their streaming API (using some code from a project called Twidoop). After wrestling with formats for a little while we realised that each JSON record was separated by a null character (all bits zeroed). There didn't seem to be an easy way to change the terminating character for the TextInputFormat that we'd normally use so we wrote our own InputFormat and I'm posting it here for future reference (and anyone else looking for a Hadoop InputFormat to process similar files).

We added it to my fork of cascading-clojure but it will work in isolation.

Processing XML in Hadoop

This is something that I’ve been battling with for a while and tonight, whilst not able to sleep, I finally found a way to crack it! The answer: use an XmlInputFormat from a related project. Now for the story.

We process lots of XML documents every day and some of them are pretty large: over 8 gigabytes uncompressed. Each. Yikes!

We’d made significant performance gains by switching the old REXML SAX parser to libxml. But we’ve suffered from random segfaults on our production servers (seemingly caused by garbage collecting bad objects). Besides, this still was running at nearly an hour for the largest reports.

Hadoop did seem to offer XML processing: the general advice was to use Hadoops’s StreamXmlRecordReader which can be accessed through using the StreamInputFormat.

This seemed to have weird behaviour with our reports (which often don’t have line-endings): lines would be duplicated and processing would jump to 100%+ complete. All a little fishy.

Hadoop Input Formats and Record Readers

The InputFormat in Hadoop does a couple of things. Most significantly, it provides the Splits that form the chunks that are sent to discrete Mappers.

Splits form the rough boundary of the data to be processed by an individual Mapper. The FileInputFormat (and it’s subclasses) generate splits on overall file size. Of course, it’s unlikely that all individual Records (a Key and Value that are passed to each Map invocation) lie neatly within these splits: records will often cross splits. The RecordReader, therefore, must handle this

… on whom lies the responsibilty to respect record-boundaries and present a record-oriented view of the logical InputSplit to the individual task.

In short, a RecordReader could scan from the start of a split for the start of it’s record. It may then continue to read past the end of it’s split to find the end. The InputSplit only contains details about the offsets from the underlying file: data is still accessed through the streams.

It seemed like the StreamXmlRecordReader was skipping around the underlying InputStream too much; reading records it wasn’t entitled to read. I tried my best at trying to understand the code but it was written a long while ago and is pretty cryptic to my limited brain.

I started trying to rewrite the code from scratch but it became pretty hairy very quickly. Take a look at the implementation of next() in LineRecordReader to see what I mean.

Mahout to the Rescue

After a little searching around on GitHub I found another XmlInputFormat courtesy of the Lucene sub-project: Mahout.

I’m happy to say it appears to work. I’ve just run a quick test on our 30 node cluster (via my VPN) and it processed the 8 gig file in about 10 minutes. Not bad.

For anyone trying to process XML with Hadoop: try Mahout’s XmlInputFormat.