Introducing clj-esper: simple Esper wrapper for Clojure

clj-esper is a very-early-stages Clojure library to wrap Esper, a complex event processing engine.

I’ve been working on building a simple systems monitoring tool over the past few weeks. The aim of this system is to capture events from various streams, provide a visualisation of how things are ‘now’ and (going forward) provide more intelligent error reporting. We’ve been using it to track success/error request rates and will be looking to fold in more event streams from inside our systems.

There’s a little info in the project’s README.md and some more in the tests, but here’s a high-level example from our monitoring app.

Esper events are modeled in clj-esper as regular maps (albeit with some metadata), and can be created with the defevent macro. The new-event function constructs a map with all fields defined (and with their values coerced- although these mappings are a little incomplete currently, they do enough for me :).

I had originally started an implementation based around defrecord and building classes but this started to become increasingly more complex so this seemed like a reasonable compromise. It’s probably not the cleanest but works ok for now.

TODO

I’m writing this (and publishing the code) because I’d really like to do a couple of things:

  • Esper allows events to contain other maps and complex types. It’d be great to add support for this.
  • Try an alternate modeling of the output event stream as a Clojure sequence (rather than the current callback function).

I’m sure there’s also a fair bit of the code that could be tidied and improved. I’d welcome any forks and pull requests of the clj-esper GitHub repository.

Hiring

We (Forward) are hiring great developers; if you’re interested then please feel free to check out our jobs page or ping me an email for more info.

Clojure, Abstraction and ZeroMQ

For me, the stand-out talk when I attended clojure-conj last year was Mark McGranaghan’s talk on Ring: “One Ring to Bind Them”. I’ve been working with ZeroMQ and Clojure these past few weeks and it’s been all the better because of Mark’s talk.

Core Abstractions

Mark’s talk deftly highlights the beauty of some of Clojure’s core abstractions, sequences and functions, and how Ring modeled it’s world around them. The following quote is from Alan Perlis and often used when talking about Clojure:

It’s better to have 100 functions operate on one data structure than to have 10 functions operate on 10 data structures

Building upon Clojure’s core abstractions makes it easy to compose new functionality from existing libraries. For example, Ring uses the concept of middleware- optional building blocks that decorate your application with additional behaviour. Because the core request handler is just a function, it’s possible to re-use Clojure’s threading macro to build up more complex behaviour:

The main-routes Var is built using Compojure, this is then wrapped by middleware to handle logging, exception handling and nested query parameters.

Of course, many other languages and libraries solve the problem in a similar way- the Java web frameworks I used to work with would most likely wrap this kind of stuff in via. an AOP framework that’s wired in with some IoC container like Spring.

What makes Clojure different, apart from not needing any more frameworks, is that it’s using such a core, language-level abstraction: there’s no need to write middleware adapters to join disparate things together.

ZeroMQ

I mentioned that it had been my working with ZeroMQ that reminded me of all of this. I’m working on wiring Esper into our production environment so that we can monitor the behaviour of our system better.

Much like Luca described in his talk, Node.js and ZeroMQ, I’m using Node.js and ZeroMQ as a kind of glue- tailing logs and other bits of information and re-publishing. This is then be aggregated and consumed in one place.

Core Abstractions

There are two things we need to do with our ZeroMQ sockets:

  • Consume data (from an ‘infinite’ stream)
  • Publish data

Of course, it’s trivialised a little, but it’s largely all we need to do.

For consuming data in Clojure, my first cut was fairly traditional- start a (while true) infinite loop that consumed messages and called out to a handler function. It looked a little like this:

It worked, but it’s pretty ugly and certainly not simple. Given it’s just calling a handler it’s also impossible to connect this to much around the rest of Clojure’s libraries without some work. Time to re-think.

Going back to our original statement of what we need to do with our sockets it’s possible to see there are some basic building blocks we could use:

Utility Clojure Type
Consume data (from an ‘infinite’ stream) Sequence
Publish data Function

Our compound consumer above can be re-written and turned into a sequence with repeatedly as follows:

Note how much cleaner the consumer code now is, and how we can re-use any of Clojure’s sequence functions to operate on the stream.

For our publisher we can write a function which returns a function:

The generated function closes over the initialised socket so that any consumer that wants to publish data can do so by just calling the function passing the bytes to be sent. It provides a clean interface and has the added advantage we can use it with many higher-order functions.

For me, this has really highlighted the power of Clojure over other JVM based languages- clean, simple, core abstractions that can be composed in simple, powerful ways.

Incidentally, I’m working on tidying up the code that interacts with both ZeroMQ and Esper. When they’re a little tidier I’ll open-source them and publish on my GitHub account, and most likely post an article or two here too. Stay tuned!

Introducing Clojure to an Organisation

A thread came up on the london-clojurians mailing list about people using Clojure in London. I replied about our use at Forward and below are the answers I gave to some questions Bruce asked.

How did we introduce Clojure?

I guess much like anything we use, we didn’t really introduce it. We experimented with it, used it to build some internal tools and systems. These would be rolled into production and we’d start getting a feel for where it might/might not be useful.

We’ve favoured building systems in small, independent pieces which makes swapping out an implementation easier, and encourages us to learn and re-learn everything about what that piece did. We’d take the knowledge but feel confident about dropping any/all of the code that ran before.

I used it originally at the beginning of last year for doing some work with Google’s APIs and to build a system to run the ETL flow for one of our businesses. A part of that runs on Hadoop (we have some tens-of-gigabyte XML files) and is still used today, although we dropped the flow system for a JRuby implementation.

More recently, we’ve been working at uSwitch to replace a monolithic .NET system (it’s actually lots of WCF services, but feels like you can’t manoeuvre easily :) with a mix of Ruby and Clojure. A colleague, Mike Jones, was on paternity leave and felt Clojure might be a good fit for the core pricing/comparison of the Utilities. As we went we uncovered all kinds of nuances and things which had been missed in the original .NET, and our Ruby implementation.

How did we get other people involved?

Anyone that worked on the team would need to do some work on the code at some point. Mike has kind of been the lead for it, but, all of us have been working with it.

We place a big emphasis on learning. We started studying SICP and organising weekly meetings to go through examples. Others started working through the Project Euler questions and meeting to go through those.

What have been the biggest problems?

I think it’s been getting the functional-feel/Clojure-feel. After a while you get a feel for what’s good/bad with OO languages- forced out through many years of TDDing and applying OO principles to older systems. I don’t think any of us had experience with a functional language, so you have to rely on more base things to know whether it’s going well. I’d say more recently, I take a lot of inspiration from Stuart Halloway’s emphasis on Simple over Compound to know where a solution lies.

I’d say it’s taken us a long time and lots of extra learning to get to being average Clojure programmers. But, I definitely think it’s made us significantly better programmers overall (a side-effect of learning SICP and some other old-school CompSci books has been making better decisions). It sets the programmer bar pretty high.

What have been the biggest successes?

I actually think one of Clojure’s best strengths is it’s Java Interop. If I’m looking at using some Java library, I’ll almost always ‘lein new’ a new project and play around. I find the flow in Emacs/SLIME with Clojure very productive now. I also think the interop code you write ends up being cleaner than with other JVM languages (think JRuby).

The bit of code I wrote at the beginning of last year to do some of our big data ETL is still being used (although given it’s triviality it’s more to do with it just working, than a specific Clojure #win :). The utilities pricing has probably been in production for around 9 months and runs really well. We hired Antonio Garrote, who slated our Clojure code during his interview :), and he’s been using Incanter to help analyse and visualise some stuff he’s been doing with Mahout.

Aside from anything Clojure specific, I think the biggest success has been the emphasis it’s placed on us to be more thoughtful and analytical programmers.

Any surprises?

This isn’t really related to Clojure per se, but, Mike and I both attended clojure-conj last year. During the flight we paired for about 7 hours and rewrote whole parts of the utilities code. Protocols had just been introduced to Clojure and we thought it might be possible to write a tidier implementation. We added them and then deleted them, our implementation didn’t really work out very well. But, what was surprising was how being locked away without the Internet was still productive. We felt it was one of the most productive 7 hours work we’ve ever had.

Cocoa-ified Emacs Fullscreen in OSX Lion

I’ve got a mostly working patch for Homebrew’s Emacs recipe that will build Emacs with a Cocoa interface that supports OSX Lion’s new full-screen mode. I’m posting here so someone else can beat me to it. I know Emacs already has full-screen, this was definitely an exercise in yak-shaving.

It doesn’t quite fill the screen (there’s still some weird height/width calculation bits in EmacsWindow I’ve not gotten my head around yet). It also messes up the screen size when you move back out of full-screen mode.

The following gist includes the full emacs.rb recipe (which adds a command to patch with the above diff).

Dynamic error capture with Clojure

I’ve recently been trying to improve parts of uSwitch.com’s Clojure pricing code for the last few days. Mostly this has been to add error handling that provides a form of graceful degradation.

The majority of the application deals with pricing the various consumer Energy plans that people can pick from. Although we try and make sure everything validates before it hits the site we’ve found a few weird problems that weren’t anticipated. This can be bad when errors prevent the other results from being returned successfully.

Ideally, we’d like to be able to capture both the errors and the plans we couldn’t price; clients can use the data we have and know what’s missing.

I’m sure I had read an interview in which Rich Hickey said how Clojure’s dynamism helped deal with error handling in a clean and safe way. I can’t find a reference (so it’s possible I made that up), but, I then was looking through The Joy of Clojure and found this:

“there are two directions for handling errors. The first … refers to the passive handling of exceptions bubbling outward from inner functions. But built on Clojure’s dynamic Var binding is a more active mode of error handling, where handlers are pushed into inner functions.”

The code below shows a trivialised version of the problem- a calculation function that might raise an error part-way through the collection.

Instead, we can use binding and an error handling function to dynamically handle the problem and push this down the stack.

Note that we’re using declare to define our error-handler Var, and, that we have to mark it as :dynamic as we’re going to be dynamically binding it.

The next stage is to progress from just returning nil and to capture the errors whilst the records are being processed. We can use Atoms to hold the state and then append more information as we flow through.

The above example introduces an errors atom that will capture the responses as we map across the sequence. But it definitely now feels a little icky.

  1. We use do to both add the error to our list of errors, and return nil.
  2. Because we’ve now introduced side-effects we must use doall to realise the sequence whilst the handle-error function is rebound.

I have to say, I’m not too sure whether I prefer this more dynamic approach or a more traditional try...catch form.

The dynamic behaviour is definitely very cool, and the authors of The Joy of Clojure say that “using dynamic scope via binding is the preferred way to handle recoverable errors in a context-sensitive manner.” so I’m very inclined to stick with it. However, I think I’ve managed to make it less nice: what originally looked like a neat use of closure to capture errors in a safe way now seems less good? I’d love to hear what more experienced Clojurists make of it.

Other references:

  1. http://kotka.de/blog/2010/05/Didyouknow_IV.html
  2. http://kotka.de/blog/2009/11/TamingtheBound_Seq.html
  3. http://cemerick.com/2009/11/03/be-mindful-of-clojures-binding/

Using partial to tidy Clojure tests

I was reading through getwoven’s infer code on GitHub and noticed a nice use of partial to tidy tests.

Frequently there’ll be a function under test that will be provided multiple inputs with perhaps only a single parameter value changing per example. Using partial you can build a function that closes over it’s variable arguments letting you tidy up the call to the single (varying) parameter.

The example I found was in infer’s sparse_classification_test.clj but I’ve written a quick gist to demonstrate the point:

Recognising SSL Requests in Rack with Custom Headers

For a while now I’ve been working on replacing bits of uSwitch.com’s .NET heritage with some new Clojure and Ruby blood. A problem with one of the new applications came up when we needed it to recognise (and behave differently) when an SSL secured request was made because of non-standard headers (although I’m yet to successfully Google the RFC where HTTP_X_FORWARDED_PROTO is defined :).

Polyglot Deployment

New applications are deployed on different machines but proxied by the old application servers- letting us mount new applications inside the current website. Although the servers run IIS and ASP.NET they’re configured much like nginx’s proxy_pass module (for example). You set the matching URL and the URL you’d like to proxy to. uSwitch.com also use hardware load balancers in front of the application and web servers.

Rack Scheme Detection

Recent releases of Rack include a ssl? method on the Request that return true if the request is secure. This, internally, calls scheme to check whether any of the HTTP headers that specify an HTTPS scheme are set (the snippet from Rack’s current code is below).

Unfortunately, this didn’t recognise our requests as being secure. The hardware load-balancer was setting a custom header that wasn’t anything Rack would expect. The quickest (and perhaps dirty?) solution- a gem, rack-scheme-detect, that allows additional mappings to be provided- the rest of Rack will continue to work (and all of the other middleware and apps above it).

The following snippet shows how it can be used inside a classic Sinatra application to configure the additional mapping.

Of course we’ll be making sure we also set one of the standard headers. But, for anyone who needs a quick solution when that’s not immediately possible, hopefully rack-scheme-detect might save some time.

Multiple connections with Hive

We’ve been using Hadoop and Hive at Forward for just over a year now. We have lots of batch jobs written in Ruby (using our Mandy library) and a couple in Clojure (using my fork of cascading-clojure). This post covers a little of how things hang together following a discussion on the Hive mailing list around opening multiple connections to the Hive service.

Hive has some internal thread safety problems (see HIVE-80) that meant we had to do something a little boutique with HAProxy given our usage. This article covers a little about how Hive fits into some of our batch and ad-hoc processing and how we put it under HAProxy.

Background

Hive is used for both programmatic jobs and interactively by analysts. Interactive queries come via a web-based product, called CardWall, that was built for our search agency. In both cases, behind the scenes, we have another application that handles scheduling jobs, retrying when things fail (and alerting if it’s unlikely to work thereafter).

Batch Scheduling

Internally, this is how things (roughly) look:

Batch processing

The Scheduler is responsible for knowing which jobs need to run and queuing work items. Within the batch processing application an Agent will pick up an item and check whether it’s dependencies are available (we use this to integrate with all kinds of other reporting systems, FTP sites etc.). If things aren’t ready it will place the item back on the queue with a delay (we use Beanstalkd which makes this all very easy).

Enter Hive

You’ll notice from the first screenshot we have some Hive jobs that are queued. When we first started using Hive we started using a single instance of the HiveServer service started as follows: $HIVE_HOME/bin/hive --service hiveserver. Mostly this was using a Ruby Gem called RBHive.

This worked fine when we were using it with a single connection, but, we ran into trouble when writing apps that would connect to Hive as part of our batch processing system.

Originally there was no job affinity, workers would pick up any piece of work and attempt it, and with nearly 20 workers this could heavily load the Hive server. Over time we introduced different queues to handle different jobs, allowing us to allocate a chunk of agents to sensitive resources (like Hive connections).

Multiple Hive Servers

We have a couple of machines that run multiple HiveServer instances. We use Upstart to manage these processes (so have hive1.conf, hive2.conf… in our /etc/init directory). Each config file specifies a port to open the service on (10001 in the example below).

These are then load-balanced with HAProxy, the config file we use for that is below

Using Clojure Protocols for Java Interop

Clojure's protocols were a feature I'd not used much of until I started working on a little pet project: clj-hector (a library for accessing the Cassandra datastore).

Protocols provide a means for attaching type-based polymorphic functions to types. In that regard they're somewhat similar to Java's interfaces (indeed Java can interoperate with Clojure through the protocol-generated types although it's not something I've tried). Here's the example of their use from the Clojure website:

Protocols go deeper though: they can be used with already defined types (think attaching behaviour through mixing modules into objects with Ruby's include). It's for this reason that they've been a particularly nice abstraction for building adapters upon: glue that translates Java objects to Clojure structures when doing interop.

Here's a snippet from a bit of the code that converts Hector query results into maps:

QueryResultImpl contains results for queries. In one instance, this is RowsImpl which contains Row results (which in turn are converted by other parts of the extend-protocol statement in the real code). The really cool part of this is that dispatch happens polymorphically: there's no need to write code to check types, dispatch (and call recursively down the tree). Instead, you map declaratively by type and build a set of statements about how to convert. Clojure does the rest.

Pretty neat.

Processing null character terminated text files in Hadoop with a NullByteTextInputFormat

It was Forward's first hackday last Friday. Tom Hall and I worked on some collaborative filtering using Mahout and Hadoop to process some data collected from Twitter.

I'd been collecting statuses through their streaming API (using some code from a project called Twidoop). After wrestling with formats for a little while we realised that each JSON record was separated by a null character (all bits zeroed). There didn't seem to be an easy way to change the terminating character for the TextInputFormat that we'd normally use so we wrote our own InputFormat and I'm posting it here for future reference (and anyone else looking for a Hadoop InputFormat to process similar files).

We added it to my fork of cascading-clojure but it will work in isolation.