Sinking Data to Neo4j from Hadoop with Cascading

Recently, I worked with a colleague (Paul Lam, aka @Quantisan on building a connector library to let Cascading interoperate with Neo4j: cascading.neo4j. Paul had been experimenting with Neo4j and Cypher to explore our data through graphs and we wanted an easy way to flow our existing data on Hadoop into Neo4j.

The data processing pipeline we’ve been growing at uSwitch.com is built around Cascalog, Hive, Hadoop and Kafka.

Once the data has been aggregated and stored a lot of our ETL is performed upon Cascalog and, by extension, Cascading. Querying/analysis is a mix of Cascalog and Hive. This layer is built upon our long-term data storage system: Hadoop; this, all combined, lets us store high-resolution data immutably at a much lower cost than uSwitch’s previous platform.

Cascading is:

application framework for Java developers to quickly and easily develop robust Data Analytics and Data Management applications on Apache Hadoop

Cascading provides a model on top of Hadoop’s very file-oriented, raw MapReduce API. It models the world around flows of data (Tuples), between Taps and according to Schemes.

For example, in Hadoop you might configure a job that reads data from a SequenceFile at a given location on HDFS. Cascading separates the 2 into slightly different concerns- the Tap would represent file storage on HDFS, the Scheme would be configured to read data according to the SequenceFile format.

Cascading provides a lot of classes to make it easier to build flows that join and aggregate data without you needing to write lots of boiler-plate MapReduce code.

Here’s an example of writing a Cascading flow (taken from the README of our cascading.neo4j library). It reads data from a delimited file (representing the Neo4j Nodes we want to create), and flows every record to our Neo4j Tap.

Although we use Cascading, really the majority of that is through Cascalog- a Clojure library that provides a datalog like language for analysing our data.

We wrote a small test flow we could use whilst testing our connector, a similar Cascalog example for the Cascading code above looks like this:

Our library, cascading.neo4j (hosted on Conjars.org) provides a Tap and Scheme suitable for sinking (writing data) to a Neo4j server using their REST connector. We extend and implement Cascading classes and interfaces making it easy to then integrate into our Cascalog processing.

cascading.neo4j allows you to create nodes, set properties on those node, add nodes to an exact match index, and create relationships (again with properties) between those nodes.

I should point out that cascading.neo4j may not be suitable for all batch processing purposes: most significantly, its pretty slow. Our production Neo4j server (on our internal virtualised infrastructure) lets us sink around 20,000 nodes per minute through the REST api. This is certainly a lot slower than Neo4j’s Batch Insert API and may make it unusable in some situations.

However, if the connector is fast enough for you it means you can sink data directly to Neo4j from your existing Cascading flows.

Performance

Whilst tweaking and tuning the connector we ran through Neo4j’s Linux Performance Guide (a great piece of technical documentation) that helped us boost performance a fair bit.

We also noticed the REST library allows for transactions to hold batch operations- to include multiple mutations in the same roundtrip. Our Neo4j RecordWriter will chunk batches- rather than writing all records in one go, you can specify the size.

We ran some tests and, on our infrastructure, using batch sizes of around 15 and 13 reducers (that is 13 ‘connections’ to our Neo4j REST api) yield the best performance of around 20,000 nodes per minute. We collected some numbers and Paul suggested we could have some fun putting those through a regression which will be the subject of my next post :)

Next steps

It’s currently still evolving a little and there’s a bit of duplicate code between the Hadoop and Local sections of the code. The biggest restrictions are it currently only supports sinking (writing) data and it’s speed may make it unsuitable for flowing very large graphs.

Hopefully this will be useful for some people and Paul and I would love pull requests for our little project.