Analysing and predicting performance of our Neo4j Cascading Connector with linear regression in R

As I mentioned in an earlier article, Paul and I have produced a library to help connect Cascading and Neo4j making it easy to sink data from Hadoop with Cascading flows into a Neo4j instance. Whilst we were waiting for our jobs to run we had a little fun with some regression analysis to optimise the performance. This post covers how we did it with R.

I’m posting because it wasn’t something I’d done before and it turned out to be pretty good fun. We played with it for a day and haven’t done much else with it since so I’m also publishing in case it’s useful for others.

We improved the write performance of our library by adding support for batching- collecting mutations into sets of transactions that are batched through Neo4j’s REST API. This improved performance (rather than using a request/response for every mutation) but also meant we needed to specify a chunk size; writing all mutations in a single transaction would be impossible.

There are 2 indepent variables that we could affect to tweak performance: the batch size and the number of simultaneous connections that are making those batch calls. N.B this assumes any other hidden factors remain constant.

For us, running this on a Hadoop cluster, these 2 variables determine the batch size in combination with the number of Hadoop’s reduce or map tasks concurrently executing.

We took some measurements during a few runs of the connector across our production data to help understand whether we were making the library faster. We then produced a regression model from the data and use the optimize function to help identify the sweet spot for our job’s performance.

We had 7 runs on our production Hadoop cluster. We let the reduce tasks (where the Neo4j write operations were occurring) run across genuine data for 5 minutes and measured how many nodes were successfully added to our Neo4j server. Although the cluster was under capacity (so the time wouldn’t include any idling/waiting) our Neo4j server instance runs on some internal virtualised infrastructure and so could have exhibited variance beyond our changes.

The results for our 7 observerations are in the table below:

Test No. Number of Reducers Batch Size Nodes per minute
1 1 10 5304.4
2 4 10 13218.8
3 4 20 13265.636
4 8 5 11289.2
5 8 10 17682.2
6 16 10 20984.2
7 8 20 20201.6

Regression in R

A regression lets us attempt to predict the value of a continuous variable based upon the value of one or more other independent variables. It also lets us quantify the strength of the relationship between the dependent variable and independent variables.

Given our experiment, we could determine whether batch size and the number of reducers (the independent variables) affected the number of Neo4j nodes we could create per minute (the dependent variable). If there was, we would use values for those 2 variables to predict performance.

The first stage is to load the experiment data into R and get it into a data frame. Once we’ve loaded it we can use R’s lm function to fit a linear model and look at our data.

In the above, the formula parameter to lm lets us describe that nodes.per.minute is our dependent variable (our outcome), and reducers and batch.size are our independent variables (our predictors).

Much like other analysis in R, the first thing we can look at is a summary of this model, which produces the following:

lm(formula = nodes.per.minute ~ reducers + batch.size, data = results)

Residuals:
    1     2     3     4     5     6     7 
-2330  2591 -1756 -1135  3062 -1621  1188 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)  
(Intercept)   2242.8     3296.7   0.680   0.5336  
reducers       998.1      235.6   4.236   0.0133 *
batch.size     439.3      199.3   2.204   0.0922 .
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1 

Residual standard error: 2735 on 4 degrees of freedom
Multiple R-squared: 0.8362, Adjusted R-squared: 0.7543 
F-statistic: 10.21 on 2 and 4 DF,  p-value: 0.02683

The summary data tells us that the model supports the data relatively well. Our R-squared is 0.075 and both batch size and reducer size are considered significant.

But, what if we tweak our model? We suspect that the shape of the performance through increasing reducers and batch size is unlikely to exhibit linear growth. We can change the formula of our model and see whether we can improve the accuracy of our model:

And let’s the the results of calling summary(model):

Call:
lm(formula = nodes.per.minute ~ reducers + I(reducers^2) + batch.size + 
    I(batch.size^2), data = results)

Residuals:
         1          2          3          4          5          6          7 
-2.433e+02  9.318e+02 -3.995e+02  9.663e-13 -7.417e+02  5.323e+01  3.995e+02 

Coefficients:
                 Estimate Std. Error t value Pr(>|t|)  
(Intercept)     -15672.16    3821.48  -4.101   0.0546 .
reducers          2755.10     337.07   8.174   0.0146 *
I(reducers^2)     -101.74      18.95  -5.370   0.0330 *
batch.size        2716.07     540.07   5.029   0.0373 *
I(batch.size^2)    -85.94      19.91  -4.316   0.0497 *
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1 

Residual standard error: 948.6 on 2 degrees of freedom
Multiple R-squared: 0.9901, Adjusted R-squared: 0.9704 
F-statistic: 50.25 on 4 and 2 DF,  p-value: 0.01961

Our R-squared is now 0.9704- our second model fits the data better than our first model.

Optimization

Given the above, we’d like to understand the values for batch size and number of reducers that will give us the highest throughput.

R has an optimize function that, given a range of values for a function parameter, returns the optimal argument for the return value.

We can create a function that calls predict.lm with our model to predict values. We can then use the optimize function to find our optimal solution:

We use a set batch size of 20 and optimize to discover that the optimal number of reducers is 13 with a throughput of 22,924 nodes/minute. The second command optimizes for batch size with a fixed number of reducers. Again, it suggests a batch size of 15 for an overall throughput of 24,409 nodes/minute.

This supports what we observed earlier with the summary data: number of reducers is more significant than batch size for predicting throughput.

I’m extremely new to most of R (and statistics too if I’m honest- the last year is the most I’ve done since university) so if anyone could tell me if there’s a way to perform an optimization for both variables that would be awesome.

Please note this post was more about our experimentation and the process- I suspect our data might be prone to systematic error and problems because we only have a few observations. I’d love to run more experiments and get more measurements but we moved on to a new problem :)

Sinking Data to Neo4j from Hadoop with Cascading

Recently, I worked with a colleague (Paul Lam, aka @Quantisan on building a connector library to let Cascading interoperate with Neo4j: cascading.neo4j. Paul had been experimenting with Neo4j and Cypher to explore our data through graphs and we wanted an easy way to flow our existing data on Hadoop into Neo4j.

The data processing pipeline we’ve been growing at uSwitch.com is built around Cascalog, Hive, Hadoop and Kafka.

Once the data has been aggregated and stored a lot of our ETL is performed upon Cascalog and, by extension, Cascading. Querying/analysis is a mix of Cascalog and Hive. This layer is built upon our long-term data storage system: Hadoop; this, all combined, lets us store high-resolution data immutably at a much lower cost than uSwitch’s previous platform.

Cascading is:

application framework for Java developers to quickly and easily develop robust Data Analytics and Data Management applications on Apache Hadoop

Cascading provides a model on top of Hadoop’s very file-oriented, raw MapReduce API. It models the world around flows of data (Tuples), between Taps and according to Schemes.

For example, in Hadoop you might configure a job that reads data from a SequenceFile at a given location on HDFS. Cascading separates the 2 into slightly different concerns- the Tap would represent file storage on HDFS, the Scheme would be configured to read data according to the SequenceFile format.

Cascading provides a lot of classes to make it easier to build flows that join and aggregate data without you needing to write lots of boiler-plate MapReduce code.

Here’s an example of writing a Cascading flow (taken from the README of our cascading.neo4j library). It reads data from a delimited file (representing the Neo4j Nodes we want to create), and flows every record to our Neo4j Tap.

Although we use Cascading, really the majority of that is through Cascalog- a Clojure library that provides a datalog like language for analysing our data.

We wrote a small test flow we could use whilst testing our connector, a similar Cascalog example for the Cascading code above looks like this:

Our library, cascading.neo4j (hosted on Conjars.org) provides a Tap and Scheme suitable for sinking (writing data) to a Neo4j server using their REST connector. We extend and implement Cascading classes and interfaces making it easy to then integrate into our Cascalog processing.

cascading.neo4j allows you to create nodes, set properties on those node, add nodes to an exact match index, and create relationships (again with properties) between those nodes.

I should point out that cascading.neo4j may not be suitable for all batch processing purposes: most significantly, its pretty slow. Our production Neo4j server (on our internal virtualised infrastructure) lets us sink around 20,000 nodes per minute through the REST api. This is certainly a lot slower than Neo4j’s Batch Insert API and may make it unusable in some situations.

However, if the connector is fast enough for you it means you can sink data directly to Neo4j from your existing Cascading flows.

Performance

Whilst tweaking and tuning the connector we ran through Neo4j’s Linux Performance Guide (a great piece of technical documentation) that helped us boost performance a fair bit.

We also noticed the REST library allows for transactions to hold batch operations- to include multiple mutations in the same roundtrip. Our Neo4j RecordWriter will chunk batches- rather than writing all records in one go, you can specify the size.

We ran some tests and, on our infrastructure, using batch sizes of around 15 and 13 reducers (that is 13 ‘connections’ to our Neo4j REST api) yield the best performance of around 20,000 nodes per minute. We collected some numbers and Paul suggested we could have some fun putting those through a regression which will be the subject of my next post :)

Next steps

It’s currently still evolving a little and there’s a bit of duplicate code between the Hadoop and Local sections of the code. The biggest restrictions are it currently only supports sinking (writing) data and it’s speed may make it unsuitable for flowing very large graphs.

Hopefully this will be useful for some people and Paul and I would love pull requests for our little project.