MapReduce with Hadoop and Ruby

The quantity of data we analyse at Forward every day has grown nearly ten-fold in just over 18 months. Today we run almost all of our automated daily processing on Hadoop. The vast majority of that analysis is automated using Ruby and our open-source gem: Mandy.

In this post I’d like to cover a little background to MapReduce and Hadoop and a light introduction to writing a word count in Ruby with Mandy that can run on Hadoop.

Mandy’s aim is to make MapReduce with Hadoop as easy as possible: providing a simple structure for writing code, and commands that make it easier to run Hadoop jobs.

Since I left ThoughtWorks and joined Forward in 2008 I’ve spent most of my time working on the internal systems we use to work with our clients, track our ads, and analyse data. Data has become truly fundamental to our business.

I think it’s worth putting the volume of our processing in numbers: we track around 9 million clicks a day across our campaigns- averaged out across a day that’s over 100 every second (it actually peaks up to around 1400 a second).

 

We automate the analysis, storage, and processing of around 80GB (and growing) of data every day. In addition, further ad-hoc analysis is performed through the use of a related project: Hive (more on this in a later blog post).

We’d be hard pressed to work with this volume of data in a reasonable fashion, or be able to change what analysis we run, so quickly if it wasn’t for Hadoop and it’s associated projects.

Hadoop is an open-source Apache project that provides “open-source software for reliable, scalable, distributed computing”. It’s written in Java and, although relatively easy to get into, still falls foul of Java development cycle problems: they’re just too long. My colleague (Andy Kent) started writing something we could prototype our jobs in Ruby quickly, and then re-implement in Java once we understood things better. However, this quickly turned into our platform of choice.

To give a taster of where we’re going, here’s the code needed to run a distributed word count:

And how to run it?

mandy-hadoop wordcount.rb hdfs-dir/war-and-peace.txt hdfs-dir/wordcount-output

The above example comes from a lab Andy and I ran a few months ago at our office. All code (and some readmes) are available on GitHub.

What is Hadoop?

The Apache Hadoop project actually contains a number of sub-projects. MapReduce probably represents the core- a framework that provides a simple distributed processing model, which when combined with HDFS provides a great way to distribute work across a cluster of hardware. Of course, it goes without saying that it is very closely based upon the Google paper.

Parallelising Problems

Google’s MapReduce paper explains how, by adopting a functional style, programs can be automatically parallelised across a large cluster of machines. The programming model is simple to grasp and, although it takes a little getting used to, can be used to express problems that are seemingly difficult to parallelise.

Significantly, working with larger data sets (both storage and processing) can be solved by growing the cluster’s capacity. In the last 6 months we’ve grown our cluster from 3 ex-development machines to 25 nodes and taken our HDFS storage from just over 1TB to nearly 40TB.

An Example

Consider an example: perform a subtraction across two sets of items. That is

{1,2,3} - {1,2} = {3}

. A naive implementation might compare every item for the first set against those in the second set.

numbers = [1,2,3]
[1,2].each { |n| numbers.delete(n) }

This is likely to be problematic when we get to large sets as the time taken will grow very fast (m*n). Given an initial set of 10,000,000 items, finding the distinct items from a second site of even 1,000 items would result in 10 billion operations.

However, because of the way the data flows through a MapReduce process, the data can be partitioned across a number of machines such that each machine performs a smaller subset of operations with the result then combined to produce a final output.

It’s this flow that makes it possible to ‘re-phrase’ our solution to the problem above and have it operate across a distributed cluster of machines.

Overall Data Flow

The Google paper and a Wikipedia article provide more detail on how things fit together, but here’s the rough flow.

Map and reduce functions are combined into one or more ‘Jobs’ (almost all of our analysis is performed by jobs that pass their output to further jobs). The general flow can be seen to be made of the following steps:

  1. Input data read
  2. Data is split
  3. Map function called for each record
  4. Shuffle/sort
  5. Sorted data is merged
  6. Reduce function called
  7. Output (for each reducer) written to a split
  8. Your application works on a join of the splits

First, the input data is read and partitioned into splits that are processed by the Map function. Each map function then receives a whole ‘record’. For text files, these records will be a whole line (marked by a carriage return/line feed at the end).

Output from the Map function is written behind-the-scenes and a shuffle/sort performed. This prepares the data for the reduce phase (which needs all values for the same key to be processed together). Once the data has been sorted and distributed to all the reducers it is then merged to produce the final input to the reduce.

Map

The first function that must be implemented is the map function. This takes a series of key/value pairs, and can emit zero or more pairs of key/value outputs. Ruby already provides a method that works similarly: Enumerable#map.

For example:

names = %w(Paul George Andy Mike)
names.map {|name| {name.size => name}} # => [{4=>"Paul"}, {6=>"George"}, {4=>"Andy"}, {4=>"Mike"}]

The map above calculates the length of each of the names, and emits a list of key/value pairs (the length and names). C# has a similar method in ConvertAll that I mentioned in a previous blog post.

Reduce

The reduce function is called once for each unique key across the output from the map phase. It also produces zero or more key value pairs.

For example, let’s say we wanted to find the frequency of all word lengths, our reduce function would look a little like this:

def reduce(key, values)
  {key => values.size}
end

Behind the scenes, the output from each invocation would be folded together to produce a final output.

To explore more about how the whole process combines to produce an output let’s now turn to writing some code.

Code: Word Count in Mandy

As I’ve mentioned previously we use Ruby and Mandy to run most of our automated analysis. This is an open-source Gem we’ve published which wraps some of the common Hadoop command-line tools, and provides some infrastructure to make it easier to write MapReduce jobs.

Installing Mandy

The code is hosted on GitHub, but you can just as easily install it from Gem Cutter using the following:

$ sudo gem install gemcutter
$ gem tumble
$ sudo gem install mandy

… or

$ sudo gem install mandy --source http://gemcutter.org

If you run the mandy command you may now see:

You need to set the HADOOP_HOME environment variable to point to your hadoop install    :(
Try setting 'export HADOOP_HOME=/my/hadoop/path' in your ~/.profile maybe?

You’ll need to make sure you have a 0.20.x install of Hadoop, with HADOOP_HOME set and $HADOOP_HOME/bin in your path. Once that’s done, run the command again and you should see:

You are running Mandy!
========================

Using Hadoop 0.20.1 located at /Users/pingles/Tools/hadoop-0.20.1

Available Mandy Commands
------------------------
mandy-map       Run a map task reading on STDIN and writing to STDOUT
mandy-local     Run a Map/Reduce task locally without requiring hadoop
mandy-rm        remove a file or directory from HDFS
mandy-hadoop    Run a Map/Reduce task on hadoop using the provided cluster config
mandy-install   Installs the Mandy Rubygem on several hosts via ssh.
mandy-reduce    Run a reduce task reading on STDIN and writing to STDOUT
mandy-put       upload a file into HDFS

If you do, you’re all set.

Writing a MapReduce Job

As you may have seen earlier to write a Mandy job you can start off with as little as:

Of course, you may be interested in writing some tests for your functions too. Well, you can do that quite easily with the Mandy::TestRunner. Our first test for the Word Count mapper might be as follows:

Our implementation for this might look as follows:

Of course, our real mapper needs to do more than just tokenise the string. It also needs to make sure we downcase and remove any other characters that we want to ignore (numbers, grammatical marks etc.). The extra specs for these are in the mandy-lab GitHub repository but are relatively straightforward to understand.

Remember that each map works on a single line of text. We emit a value of 1 for each word we find which will then be shuffled/sorted and partitioned by the Hadoop machinery so that a subset of data is sent to each reducer. Each reducer will, however, receive all the values for that key. All we need to do in our reducer is sum all the values (all the 1’s) and we’ll have a count. First, our spec:

And our implementation:

In fact, Mandy has a number of built-in reducers that can be used (so you don’t need to re-implement this). To re-use the Sum-ming reducer replace the reduce statement above to reduce(Mandy::Reducers::SumReducer).

Our mandy-lab repository includes some sample text files you can run the examples on. Assuming you’re in the root of the repo you can then run the following to perform the wordcount across Alice in Wonderland locally:

$ mandy-local word_count.rb input/alice.txt output
Running Word Count...
/Users/pingles/.../output/1-word-count

If you open the /Users/pingles/.../output/1-word-count file you should see a list of words and their frequency across the document set.

Running the Job on a Cluster

The mandy-local command just uses some shell commands to imitate the MapReduce process. If you try running the same command over a larger dataset you’ll come unstuck. So let’s see how we run the same example on a real Hadoop cluster. Note, that for this to work you’ll need to either have a Hadoop cluster or Hadoop running locally in pseudo-distributed mode.

If you have a cluster up and running, you’ll need to write a small XML configuration file to provide the HDFS and JobTracker connection info. By default the mandy-hadoop command will look for a file called cluster.xml in your working directory. It should look a little like this:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://datanode:9000/</value>
  </property>
  <property>
    <name>mapred.job.tracker</name>
    <value>jobtracker:9001</value>
  </property>
  <property>
    <name>hadoop.job.ugi</name>
    <value>user,supergroup</value>
  </property>
</configuration>

Once that’s saved, you need to copy your text file to HDFS so that it can be distributed across the cluster ready for processing. As an aside: HDFS shards data in 64MB blocks across the nodes. This provides redundancy to help mitigate in the case of machine failure. It also means the job tracker (the node which orchestrates the processing) can move the computation close to the data and avoid copying large amounts of data unnecessarily.

To copy a file up to HDFS, you can run the following command

$ mandy-put my-local-file.txt word-count-example/remote-file.txt

You’re now ready to run your processing on the cluster. To re-run the same word count code on a real Hadoop cluster, you change the mandy-local command to mandy-hadoop as follows:

$ mandy-hadoop word_count.rb word-count-example/remote-file.txt word-count-example/output

Once that’s run you should see some output from Hadoop telling you the job has started, and where you can track it’s progress (via the HTTP web console):

Packaging code for distribution...
Loading Mandy scripts...

Submitting Job: [1] Word Count...
Job ID: job_200912291552_2336
Kill Command: mandy-kill job_200912291552_2336 -c /Users/pingles/.../cluster.xml
Tracking URL: http://jobtracker:50030/jobdetails.jsp?jobid=job_200912291552_2336

word-count-example/output/1-word-count

Cleaning up...
Completed Successfully!

If you now take a look inside ./word-count-example/output/1-word-count you should see a few part-xxx files. These contain the output from each reducer. Here’s the output from a reducer that ran during my job (running across War and Peace):

abandoned   54
abandoning  26
abandonment 14
abandons    1
abate   2
abbes   1
abdomens    2
abduction   3
able    107

Wrap-up

That’s about as much as I can cover in an introductory post for Mandy and Hadoop. I’ll try and follow this up with a few more to show how you can pass parameters from the command-line to Mandy jobs, how to serialise data between jobs, and how to tackle jobs in a map and reduce stylee.

As always, please feel free to email me or comment if you have any questions, or (even better) anything you’d like me to cover in a future post.