A quick introduction to Apache Spark for statisticians

Introduction

Apache Spark is a Scala library for analysing "big data". It can be used for analysing huge (internet-scale) datasets distributed across large clusters of machines. The analysis can be anything from the computation of simple descriptive statistics associated with the datasets, through to rather sophisticated machine learning pipelines involving data pre-processing, transformation, nonlinear model fitting and regularisation parameter tuning (via methods such as cross-validation). A relatively impartial overview can be found in the Apache Spark Wikipedia page.

Although Spark is really aimed at data that can’t easily be analysed on a laptop, it is actually very easy to install and use (in standalone mode) on a laptop, and a good laptop with a fast multicore processor and plenty of RAM is fine for datasets up to a few gigabytes in size. This post will walk through getting started with Spark, installing it locally (not requiring admin/root access) doing some simple descriptive analysis, and moving on to fit a simple linear regression model to some simulated data. After this walk-through it should be relatively easy to take things further by reading the Spark documentation, which is generally pretty good.

Anyone who is interested in learning more about setting up and using Spark clusters may want to have a quick look over on my personal blog (mainly concerned with the Raspberry Pi), where I have previously considered installing Spark on a Raspberry Pi 2, setting up a small Spark cluster, and setting up a larger Spark cluster. Although these posts are based around the Raspberry Pi, most of the material there is quite generic, since the Raspberry Pi is just a small (Debian-based) Linux server.

Getting started – installing Spark

The only pre-requisite for installing Spark is a recent Java installation. On Debian-based Linux systems (such as Ubuntu), Java can be installed with:

sudo apt-get update
sudo apt-get install openjdk-8-jdk

For other systems you should Google for the best way to install Java. If you aren’t sure whether you have Java or not, type java -version into a terminal window. If you get a version number of the form 1.7.x or 1.8.x you should be fine.

Once you have Java installed, you can download and install Spark in any appropriate place in your file-system. If you are running Linux, or a Unix-alike, just cd to an appropriate place and enter the following commands:

wget http://www.eu.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
tar xvfz spark-2.1.0-bin-hadoop2.7.tgz 
cd spark-2.1.0-bin-hadoop2.7
bin/run-example SparkPi 10

If all goes well, the last command should run an example. Don’t worry if there are lots of INFO and WARN messages – we will sort that out shortly. On other systems it should simply be a matter of downloading and unpacking Spark somewhere appropriate, then running the example from the top-level Spark directory. Get Spark from the downloads page. You should get version 2.1.0 built for Hadoop 2.7. It doesn’t matter if you don’t have Hadoop installed – it is not required for single-machine use.

The INFO messages are useful for debugging cluster installations, but are too verbose for general use. On a Linux system you can turn down the verbosity with:

sed 's/rootCategory=INFO/rootCategory=WARN/g' < conf/log4j.properties.template > conf/log4j.properties

On other systems, copy the file log4j.properties.template in the conf sub-directory to log4j.properties and edit the file, replacing INFO with WARN on the relevant line. Check it has worked by re-running the SparkPi example – it should be much less verbose this time. You can also try some other examples:

bin/run-example SparkLR
ls examples/src/main/scala/org/apache/spark/examples/

There are several different ways to use Spark. For this walk-through we are just going to use it interactively from the "Spark shell". We can pop up a shell with:

bin/spark-shell --master local[4]

The "4" refers to the number of worker threads to use. Four is probably fine for most decent laptops. Ctrl-D or :quit will exit the Spark shell and take you back to your OS shell. It is more convenient to have the Spark bin directory in your path. If you are using bash or a similar OS shell, you can temporarily add the Spark bin to your path with the OS shell command:

export PATH=$PATH:`pwd`/bin

You can make this permanent by adding a line like this (but with the full path hard-coded) to your .profile or similar start-up dot-file. I prefer not to do this, as I typically have several different Spark versions on my laptop and want to be able to select exactly the version I need. If you are not running bash, Google how to add a directory to your path. Check the path update has worked by starting up a shell with:

spark-shell --master local[4]

Note that if you want to run a script containing Spark commands to be run in "batch mode", you could do it with a command like:

spark-shell --driver-memory 25g --master local[4] < spark-script.scala | tee script-out.txt

There are much better ways to develop and submit batch jobs to Spark clusters, but I won’t discuss those in this post. Note that while Spark is running, diagnostic information about the "cluster" can be obtained by pointing a web browser at port 4040 on the master, which here is just http://localhost:4040/ – this is extremely useful for debugging purposes.

First Spark shell commands

Counting lines in a file

We are now ready to start using Spark. From a Spark shell in the top-level directory, enter:

sc.textFile("README.md").count

If all goes well, you should get a count of the number of lines in the file README.md. The value sc is the "Spark context", containing information about the Spark cluster (here it is just a laptop, but in general it could be a large cluster of machines, each with many processors and each processor with many cores). The textFile method loads up the file into an RDD (Resilient Distributed Dataset). The RDD is the fundamental abstraction provided by Spark. It is a lazy distributed parallel monadic collection. After loading a text file like this, each element of the collection represents one line of the file. I’ve talked about monadic collections in previous posts, so if this isn’t a familiar concept, it might be worth having a quick skim through at least the post on first steps with monads in Scala. The point is that although RDDs are potentially huge and distributed over a large cluster, using them is very similar to using any other monadic collection in Scala. We can unpack the previous command slightly as follows:

val rdd1 = sc.textFile("README.md")
rdd1
rdd1.count

Note that RDDs are "lazy", and this is important for optimising complex pipelines. So here, after assigning the value rdd1, no data is actually loaded into memory. All of the actual computation is deferred until an "action" is called – count is an example of such an action, and therefore triggers the loading of data into memory and the counting of elements.

Counting words in a file

We can now look at a very slightly more complex pipeline – counting the number of words in a text file rather than the number of lines. This can be done as follows:

sc.textFile("README.md").
  map(_.trim).
  flatMap(_.split(' ')).
  count

Note that map and flatMap are both lazy ("transformations" in Spark terminology), and so no computation is triggered until the final action, count is called. The call to map will just trim any redundant white-space from the line ends. So after the call to map the RDD will still have one element for each line of the file. However, the call to flatMap splits each line on white-space, so after this call each element of the RDD will correspond to a word, and not a line. So, the final count will again count the number of elements in the RDD, but here this corresponds to the number of words in the file.

Counting character frequencies in a file

A final example before moving on to look at quantitative data analysis: counting the frequency with which each character occurs in a file. This can be done as follows:

sc.textFile("README.md").
  map(_.toLowerCase).
  flatMap(_.toCharArray).
  map{(_,1)}.
  reduceByKey(_+_).
  collect

The first call to map converts upper case characters to lower case, as we don’t want separate counts for upper and lower case characters. The call to flatMap then makes each element of the RDD correspond to a single character in the file. The second call to map transforms each element of the RDD to a key-value pair, where the key is the character and the value is the integer 1. RDDs have special methods for key-value pairs in this form – the method reduceByKey is one such – it applies the reduction operation (here just "+") to all values corresponding to a particular value of the key. Since each character has the value 1, the sum of the values will be a character count. Note that the reduction will be done in parallel, and for this to work it is vital that the reduction operation is associative. Simple addition of integers is clearly associative, so here we are fine. Note that reduceByKey is a (lazy) transformation, and so the computation needs to be triggered by a call to the action collect.

On most Unix-like systems there is a file called words that is used for spell-checking. The example below applies the character count to this file. Note the calls to filter, which filter out any elements of the RDD not matching the predicate. Here it is used to filter out special characters.

sc.textFile("/usr/share/dict/words").
  map(_.trim).
  map(_.toLowerCase).
  flatMap(_.toCharArray).
  filter(_ > '/').
  filter(_ < '}').
  map{(_,1)}.
  reduceByKey(_+_).
  collect

Analysis of quantitative data

Descriptive statistics

We first need some quantitative data, so let’s simulate some. Breeze is the standard Scala library for scientific and statistical computing. I’ve given a quick introduction to Breeze in a previous post. Spark has a dependence on Breeze, and therefore can be used from inside the Spark shell – this is very useful. So, we start by using Breeze to simulate a vector of normal random quantities:

import breeze.stats.distributions._
val x = Gaussian(1.0,2.0).sample(10000)

Note, though, that x is just a regular Breeze Vector, a simple serial collection all stored in RAM on the master thread. To use it as a Spark RDD, we must convert it to one, using the parallelize function:

val xRdd = sc.parallelize(x)

Now xRdd is an RDD, and so we can do Spark transformations and actions on it. There are some special methods for RDDs containing numeric values:

xRdd.mean
xRdd.sampleVariance

Each summary statistic is computed with a single pass through the data, but if several summary statistics are required, it is inefficient to make a separate pass through the data for each summary, so the stats method makes a single pass through the data returning a StatsCounter object that can be used to compute various summary statistics.

val xStats = xRdd.stats
xStats.mean
xStats.sampleVariance
xStats.sum

The StatsCounter methods are: count, mean, sum, max, min, variance, sampleVariance, stdev, sampleStdev.

Linear regression

Moving beyond very simple descriptive statistics, we will look at a simple linear regression model, which will also allow us to introduce Spark DataFrames – a high level abstraction layered on top of RDDs which makes working with tabular data much more convenient, especially in the context of statistical modelling.

We start with some standard (non-Spark) Scala Breeze code to simulate some data from a simple linear regression model. We use the x already simulated as our first covariate. Then we simulate a second covariate, x2. Then, using some residual noise, eps, we simulate a regression model scenario, where we know that the "true" intercept is 1.5 and the "true" covariate regression coefficients are 2.0 and 1.0.

val x2 = Gaussian(0.0,1.0).sample(10000)
val xx = x zip x2
val lp = xx map {p => 2.0*p._1 + 1.0*p._2 + 1.5}
val eps = Gaussian(0.0,1.0).sample(10000)
val y = (lp zip eps) map (p => p._1 + p._2)
val yx = (y zip xx) map (p => (p._1,p._2._1,p._2._2))

val rddLR = sc.parallelize(yx)

Note that the last line converts the regular Scala Breeze collection into a Spark RDD using parallelize. We could, in principle, do regression modelling using raw RDDs, and early versions of Spark required this. However, statisticians used to statistical languages such as R know that data frames are useful for working with tabular data. I gave a brief overview of Scala data frame libraries in a previous post. We can convert an RDD of tuples to a Spark DataFrame as follows:

val dfLR = rddLR.toDF("y","x1","x2")
dfLR.show
dfLR.show(5)

Note that show shows the first few rows of a DataFrame, and giving it a numeric argument specifies the number to show. This is very useful for quick sanity-checking of DataFrame contents.

Note that there are other ways of getting data into a Spark DataFrame. One of the simplest ways to get data into Spark from other systems is via a CSV file. A properly formatted CSV file with a header row can be read into Spark with a command like:

// Don't run unless you have an appropriate CSV file...
val df = spark.read.
  option("header","true").
  option("inferSchema","true").
  csv("myCsvFile.csv")

This requires two passes over the data – one to infer the schema and one to actually read the data. For very large datasets it is better to declare the schema and not use automatic schema inference. However, for very large datasets, CSV probably isn’t a great choice of format anyway. Spark supports many more efficient data storage formats. Note that Spark also has functions for querying SQL (and other) databases, and reading query results directly into DataFrame objects. For people familiar with databases, this is often the most convenient way of ingesting data into Spark. See the Spark DataFrames guide and the API docs for DataFrameReader for further information.

Spark has an extensive library of tools for the development of sophisticated machine learning pipelines. Included in this are functions for fitting linear regression models, regularised regression models (Lasso, ridge, elastic net), generalised linear models, including logistic regression models, etc., and tools for optimising regularisation parameters, for example, using cross-validation. For this post I’m just going to show how to fit a simple OLS linear regression model: see the ML pipeline documentation for further information, especially the docs on classification and regression.

We start by creating an object for fitting linear regression models:

import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.linalg._

val lm = new LinearRegression
lm.explainParams
lm.getStandardization
lm.setStandardization(false)
lm.getStandardization
lm.explainParams

Note that there are many parameters associated with the fitting algorithm, including regularisation parameters. These are set to defaults corresponding to no regularisation (simple OLS). Note, however, that the algorithm defaults to standardising covariates to be mean zero variance one. We can turn that off before fitting the model if desired.

Also note that the model fitting algorithm assumes that the DataFrame to be fit has (at least) two columns, one called label containing the response variable, and one called features, where each element is actually a Vectors of covariates. So we first need to transform our DataFrame into the required format.

// Transform data frame to required format
val dflr = (dfLR map {row => (row.getDouble(0), 
           Vectors.dense(row.getDouble(1),row.getDouble(2)))}).
           toDF("label","features")
dflr.show(5)

Now we have the data in the correct format, it is simple to fit the model and look at the estimated parameters.

// Fit model
val fit = lm.fit(dflr)
fit.intercept
fit.coefficients

You should see that the estimated parameters are close to the "true" parameters that were used to simulate from the model. More detailed diagnostics can be obtained from the fitted summary object.

val summ = fit.summary
summ.r2
summ.rootMeanSquaredError
summ.coefficientStandardErrors
summ.pValues
summ.tValues
summ.predictions
summ.residuals

So, that’s how to fit a simple OLS linear regression model. Fitting GLMs (including logistic regression) is very similar, and setting up routines to tune regularisation parameters via cross-validation is not much more difficult.

Further reading

As previously mentioned, once you are up and running with a Spark shell, the official Spark documentation is reasonably good. First go through the quick start guide, then the programming guide, then the ML guide, and finally, consult the API docs. I discussed books on scala for data science in the previous post – many of these cover Spark to a greater or lesser extent.

I recently gave a talk on some of the general principles behind the use of functional programming for scalable statistical computing, and how concepts from category theory, such as monads, can help. The PDF slides are available. I’m not sure how comprehensible they will be without my explanations and white-board diagrams, but come to think of it, I’m not sure how comprehensible they were with my explanations and white-board diagrams… Also note that I occasionally run a three-day short-course on Scala for statistical computing, and much of the final day is concerned with using Apache Spark.

Advertisements

Books on Scala for statistical computing and data science

Introduction

People regularly ask me about books and other resources for getting started with Scala for statistical computing and data science. This post will focus on books, but it’s worth briefly noting that there are a number of other resources available, on-line and otherwise, that are also worth considering. I particularly like the Coursera course Functional Programming Principles in Scala – I still think this is probably the best way to get started with Scala and functional programming for most people. In fact, there is an entire Functional Programming in Scala Specialization that is worth considering – I’ll probably discuss that more in another post. I’ve got a draft page of Scala links which has a bias towards scientific and statistical computing, and I’m currently putting together a short course in that area, which I’ll also discuss further in future posts. But this post will concentrate on books.

Reading list

Getting started with Scala

Before one can dive into statistical computing and data science using Scala, it’s a good idea to understand a bit about the language and about functional programming. There are by now many books on Scala, and I haven’t carefully reviewed all of them, but I’ve looked at enough to have an idea about good ways of getting started.

  • Programming in Scala: Third edition, Odersky et al, Artima.
    • This is the Scala book, often referred to on-line as PinS. It is a weighty tome, and works through the Scala language in detail, starting from the basics. Every serious Scala programmer should own this book. However, it isn’t the easiest introduction to the language.
  • Scala for the Impatient, Horstmann, Addison-Wesley.
    • As the name suggests, this is a much quicker and easier introduction to Scala than PinS, but assumes reasonable familiarity with programming in general, and sort-of assumes that the reader has a basic knowledge of Java and the JVM ecosystem. That said, it does not assume that the reader is a Java expert. My feeling is that for someone who has a reasonable programming background and a passing familiarity with Java, then this book is probably the best introduction to the language. Note that there is a second edition in the works.
  • Functional Programming in Scala Chiusano and Bjarnason, Manning.
    • It is possible to write Scala code in the style of "Java-without-the-semi-colons", but really the whole point of Scala is to move beyond that kind of Object-Oriented programming style. How much you venture down the path towards pure Functional Programming is very much a matter of taste, but many of the best Scala programmers are pretty hard-core FP, and there’s probably a reason for that. But many people coming to Scala don’t have a strong FP background, and getting up to speed with strongly-typed FP isn’t easy for people who only know an imperative (Object-Oriented) style of programming. This is the book that will help you to make the jump to FP. Sometimes referred to online as FPiS, or more often even just as the red book, this is also a book that every serious Scala programmer should own (and read!). Note that is isn’t really a book about Scala – it is a book about strongly typed FP that just "happens" to use Scala for illustrating the ideas. Consequently, you will probably want to augment this book with a book that really is about Scala, such as one of the books above. Since this is the first book on the list published by Manning, I should also mention how much I like computing books from this publisher. They are typically well-produced, and their paper books (pBooks) come with complimentary access to well-produced DRM-free eBook versions, however you purchase them.
  • Functional and Reactive Domain Modeling, Ghosh, Manning.
    • This is another book that isn’t really about Scala, but about software engineering using a strongly typed FP language. But again, it uses Scala to illustrate the ideas, and is an excellent read. You can think of it as a more practical "hands-on" follow-up to the red book, which shows how the ideas from the red book translate into effective solutions to real-world problems.
  • Structure and Interpretation of Computer Programs, second edition Abelson et al, MIT Press.
    • This is not a Scala book! This is the only book in this list which doesn’t use Scala at all. I’ve included it on the list because it is one of the best books on programming that I’ve read, and is the book that I wish someone had told me about 20 years ago! In fact the book uses Scheme (a Lisp derivative) as the language to illustrate the ideas. There are obviously important differences between Scala and Scheme – eg. Scala is strongly statically typed and compiled, whereas Scheme is dynamically typed and interpreted. However, there are also similarities – eg. both languages support and encourage a functional style of programming but are not pure FP languages. Referred to on-line as SICP this book is a classic. Note that there is no need to buy a paper copy if you like eBooks, since electronic versions are available free on-line.

Scala for statistical computing and data science

  • Scala for Data Science, Bugnion, Packt.
    • Not to be confused with the (terrible) book, Scala for machine learning by the same publisher. Scala for Data Science is my top recommendation for getting started with statistical computing and data science applications using Scala. I have reviewed this book in another post, so I won’t say more about it here (but I like it).
  • Scala Data Analysis Cookbook, Manivannan, Packt.
    • I’m not a huge fan of the cookbook format, but this book is really mis-named, as it isn’t really a cookbook and isn’t really about data analysis in Scala! It is really a book about Apache Spark, and proceeds fairly sequentially in the form of a tutorial introduction to Spark. Spark is an impressive piece of technology, and it is obviously one of the factors driving interest in Scala, but it’s important to understand that Spark isn’t Scala, and that many typical data science applications will be better tackled using Scala without Spark. I’ve not read this book cover-to-cover as it offers little over Scala for Data Science, but its coverage of Spark is a bit more up-to-date than the Spark books I mention below, so it could be of interest to those who are mainly interested in Scala for Spark.
  • Scala High Performance Programming, Theron and Diamant, Packt.
    • This is an interesting book, fundamentally about developing high performance streaming data processing algorithm pipelines in Scala. It makes no reference to Spark. The running application is an on-line financial trading system. It takes a deep dive into understanding performance in Scala and on the JVM, and looks at how to benchmark and profile performance, diagnose bottlenecks and optimise code. This is likely to be of more interest to those interested in developing efficient algorithms for scientific and statistical computing rather than applied data scientists, but it covers some interesting material not covered by any of the other books in this list.
  • Learning Spark, Karau et al, O’Reilly.
    • This book provides an introduction to Apache Spark, written by some of the people who developed it. Spark is a big data analytics framework built on top of Scala. It is arguably the best available framework for big data analytics on computing clusters in the cloud, and hence there is a lot of interest in it. The book is a perfectly good introduction to Spark, and shows most examples implemented using the Java and Python APIs in addition to the canonical Scala (Spark Shell) implementation. This is useful for people working with multiple languages, but can be mildly irritating to anyone who is only interested in Scala. However, the big problem with this (and every other) book on Spark is that Spark is evolving very quickly, and so by the time any book on Spark is written and published it is inevitably very out of date. It’s not clear that it is worth buying a book specifically about Spark at this stage, or whether it would be better to go for a book like Scala for Data Science, which has a couple of chapters of introduction to Spark, which can then provide a starting point for engaging with Spark’s on-line documentation (which is reasonably good).
  • Advanced Analytics with Spark, Ryza et al, O’Reilly.
    • This book has a bit of a "cookbook" feel to it, which some people like and some don’t. It’s really more like an "edited volume" with different chapters authored by different people. Unlike Learning Spark it focuses exclusively on the Scala API. The book basically covers the development of a bunch of different machine learning pipelines for a variety of applications. My main problem with this book is that it has aged particularly badly, as all of the pipelines are developed with raw RDDs, which isn’t how ML pipelines in Spark are constructed any more. So again, it’s difficult for me to recommend. The message here is that if you are thinking of buying a book about Spark, check very carefully when it was published and what version of Spark it covers and whether that is sufficiently recent to be of relevance to you.

Summary

There are lots of books to get started with Scala for statistical computing and data science applications. My "bare minimum" recommendation would be some generic Scala book (doesn’t really matter which one), the red book, and Scala for data science. After reading those, you will be very well placed to top-up your knowledge as required with on-line resources.

Scala for Data Science [book review]

This post will review the book:

Disclaimer: This book review has not been solicited by the publisher (or anyone else) in any way. I purchased the review copy of this book myself. I have not received any benefit from the writing of this review.

Introduction

On this blog I previously reviewed the (terrible) book, Scala for machine learning by the same publisher. I was therefore rather wary of buying this book. But the topic coverage looked good, so I decided to buy it, and wasn’t disappointed. Scala for Data Science is my top recommendation for getting started with statistical computing and data science applications using Scala.

Overview

The book assumes a basic familiarity with programming in Scala, at around the level of someone who has completed the Functional Programming Principles in Scala Coursera course. That is, it (quite sensibly) doesn’t attempt to teach the reader how to program in Scala, but rather how to approach the development of data science applications using Scala. It introduces more advanced Scala idioms gradually (eg. typeclasses don’t appear until Chapter 5), so it is relatively approachable for those who aren’t yet Scala experts. The book does cover Apache Spark, but Spark isn’t introduced until Chapter 10, so it isn’t “just another Spark book”. Most of the book is about developing data science applications in Scala, completely independently of Spark. That said, it also provides one of the better introductions to Spark, so doubles up as a pretty good introductory Spark book, in addition to being a good introduction to the development of data science applications with Scala. It should probably be emphasised that the book is very much focused on data science, rather than statistical computing, but there is plenty of material of relevance to those who are more interested in statistical computing than applied data science.

Chapter by chapter

  1. Scala and Data Science – motivation for using Scala in preference to certain other languages I could mention…
  2. Manipulating data with BreezeBreeze is the standard Scala library for scientific and statistical computing. It’s pretty good, but documentation is rather lacking. This Chapter provides a good tutorial introduction to Breeze, which should be enough to get people going sufficiently to be able to make some sense of the available on-line documentation.
  3. Plotting with breeze-viz – Breeze has some support for plotting and visualisation of data. It’s somewhat limited when compared to what is available in R, but is fine for interactive exploratory analysis. However, the available on-line documentation for breeze-viz is almost non-existent. This Chapter is the best introduction to breeze-viz that I have seen.
  4. Parallel collections and futures – the Scala standard library has built-in support for parallel and concurrent programming based on functional programming concepts such as parallel (monadic) collections and Futures. Again, this Chapter provides an excellent introduction to these powerful concepts, allowing the reader to start developing parallel algorithms for multi-core hardware with minimal fuss.
  5. Scala and SQL through JDBC – this Chapter looks at connecting to databases using standard JVM mechanisms such as JDBC. However, it gradually introduces more functional ways of interfacing with databases using typeclasses, motivating:
  6. Slick – a functional interface for SQL – an introduction to the Slick library for a more Scala-esque way of database interfacing.
  7. Web APIs – the practicalities of talking to web APIs. eg. authenticated HTTP requests and parsing of JSON responses.
  8. Scala and MongoDB – working with a NoSQL database from Scala
  9. Concurrency with Akka – Akka is the canonical implementation of the actor model in Scala, for building large concurrent applications. It is the foundation on which Spark is built.
  10. Distributed batch processing with Spark – a tutorial introduction to Apache Spark. Spark is a big data analytics framework built on top of Scala and Akka. It is arguably the best available framework for big data analytics on computing clusters in the cloud, and hence there is a lot of interest in it. Indeed, Spark is driving some of the interest in Scala.
  11. Spark SQL and DataFrames – interfacing with databases using Spark, and more importantly, an introduction to Spark’s DataFrame abstraction, which is now fundamental to developing machine learning pipelines in Spark.
  12. Distributed machine learning with MLLib – MLLib is the machine learning library for Spark. It is worth emphasising that unlike many early books on Spark, this chapter covers the newer DataFrame-based pipeline API, in addition to the original RDD-based API. Together, Chapters 10, 11 and 12 provide a pretty good tutorial introduction to Spark. After working through these, it should be easy to engage with the official on-line Spark documentation.
  13. Web APIs with Play – is concerned with developing a web API at the end of a data science pipeline.
  14. Visualisation with D3 and the Play framework – is concerned with integrating visualisation into a data science web application.

Summary

This book provides a good tutorial introduction to a large number of topics relevant to statisticians and data scientists interested in developing data science applications using Scala. After working through this book, readers should be well-placed to augment their knowledge with readily searchable on-line documentation.

In a follow-up post I will give a quick overview of some other books relevant to getting started with Scala for statistical computing and data science.

A scalable particle filter in Scala

Introduction

Many modern algorithms in computational Bayesian statistics have at their heart a particle filter or some other sequential Monte Carlo (SMC) procedure. In this blog I’ve discussed particle MCMC algorithms which use a particle filter in the inner-loop in order to compute a (noisy, unbiased) estimate of the marginal likelihood of the data. These algorithms are often very computationally intensive, either because the forward model used to propagate the particles is expensive, or because the likelihood associated with each particle/observation is expensive (or both). In this case it is desirable to parallelise the particle filter to run on all available cores of a machine, or in some cases, it would even be desirable to distribute the the particle filter computation across a cluster of machines.

Parallelisation is difficult when using the conventional imperative programming languages typically used in scientific and statistical computing, but is much easier using modern functional languages such as Scala. In fact, in languages such as Scala it is possible to describe algorithms at a higher level of abstraction, so that exactly the same algorithm can run in serial, run in parallel across all available cores on a single machine, or run in parallel across a cluster of machines, all without changing any code. Doing so renders parallelisation a non-issue. In this post I’ll talk through how to do this for a simple bootstrap particle filter, but the same principle applies for a large range of statistical computing algorithms.

Typeclasses and monadic collections

In the previous post I gave a quick introduction to the monad concept, and to monadic collections in particular. Many computational tasks in statistics can be accomplished using a sequence of operations on monadic collections. We would like to write code that is independent of any particular implementation of a monadic collection, so that we can switch to a different implementation without changing the code of our algorithm (for example, switching from a serial to a parallel collection). But in strongly typed languages we need to know at compile time that the collection we use has the methods that we require. Typeclasses provide a nice solution to this problem. I don’t want to get bogged down in a big discussion about Scala typeclasses here, but suffice to say that they describe a family of types conforming to a particular interface in an ad hoc loosely coupled way (they are said to provide ad hoc polymorphism). They are not the same as classes in traditional O-O languages, but they do solve a similar problem to the adaptor design pattern, in a much cleaner way. We can describe a simple typeclass for our monadic collection as follows:

trait GenericColl[C[_]] {
  def map[A, B](ca: C[A])(f: A => B): C[B]
  def reduce[A](ca: C[A])(f: (A, A) => A): A
  def flatMap[A, B, D[B] <: GenTraversable[B]](ca: C[A])(f: A => D[B]): C[B]
  def zip[A, B](ca: C[A])(cb: C[B]): C[(A, B)]
  def length[A](ca: C[A]): Int
}

In the typeclass we just list the methods that we expect our generic collection to provide, but do not say anything about how they are implemented. For example, we know that operations such as map and reduce can be executed in parallel, but this is a separate concern. We can now write code that can be used for any collection conforming to the requirements of this typeclass. The full code for this example is provided in the associated github repo for this blog, and includes the obvious syntax for this typeclass, and typeclass instances for the Scala collections Vector and ParVector, that we will exploit later in the example.

SIR step for a bootstrap filter

We can now write some code for a single observation update of a bootstrap particle filter.

def update[S: State, O: Observation, C[_]: GenericColl](
  dataLik: (S, O) => LogLik, stepFun: S => S
)(x: C[S], o: O): (LogLik, C[S]) = {
  val xp = x map (stepFun(_))
  val lw = xp map (dataLik(_, o))
  val max = lw reduce (math.max(_, _))
  val rw = lw map (lwi => math.exp(lwi - max))
  val srw = rw reduce (_ + _)
  val l = rw.length
  val z = rw zip xp
  val rx = z flatMap (p => Vector.fill(Poisson(p._1 * l / srw).draw)(p._2))
  (max + math.log(srw / l), rx)
}

This is a very simple bootstrap filter, using Poisson resampling for simplicity and data locality, but does include use of the log-sum-exp trick to prevent over/underflow of raw weight calculations, and tracks the marginal (log-)likelihood of the observation. With this function we can now pass in a “prior” particle distribution in any collection conforming to our typeclass, together with a propagator function, an observation (log-)likelihood, and an observation, and it will return back a new collection of particles of exactly the same type that was provided for input. Note that all of the operations we require can be accomplished with the standard monadic collection operations declared in our typeclass.

Filtering as a functional fold

Once we have a function for executing one step of a particle filter, we can produce a function for particle filtering as a functional fold over a sequence of observations:

def pFilter[S: State, O: Observation, C[_]: GenericColl, D[O] <: GenTraversable[O]](
  x0: C[S], data: D[O], dataLik: (S, O) => LogLik, stepFun: S => S
): (LogLik, C[S]) = {
  val updater = update[S, O, C](dataLik, stepFun) _
  data.foldLeft((0.0, x0))((prev, o) => {
    val next = updater(prev._2, o)
    (prev._1 + next._1, next._2)
  })
}

Folding data structures is a fundamental concept in functional programming, and is exactly what is required for any kind of filtering problem. Note that Brian Beckman has recently written a series of articles on Kalman filtering as a functional fold.

Marginal likelihoods and parameter estimation

So far we haven’t said anything about parameters or parameter estimation, but this is appropriate, since parametrisation is a separate concern from filtering. However, once we have a function for particle filtering, we can produce a function concerned with evaluating marginal likelihoods trivially:

def pfMll[S: State, P: Parameter, O: Observation, 
            C[_]: GenericColl, D[O] <: GenTraversable[O]](
  simX0: P => C[S], stepFun: P => S => S, 
  dataLik: P => (S, O) => LogLik, data: D[O]
): (P => LogLik) = (th: P) => 
       pFilter(simX0(th), data, dataLik(th), stepFun(th))._1

Note that this higher-order function does not return a value, but instead a function which will accept a parameter as input and return a (log-)likelihood as output. This can then be used for parameter estimation purposes, perhaps being used in a PMMH pMCMC algorithm, or something else. Again, this is a separate concern.

Example

Here I’ll just give a completely trivial toy example, purely to show how the functions work. For avoidance of doubt, I know that there are many better/simpler/easier ways to tackle this problem! Here we will just look at inferring the auto-regression parameter of a linear Gaussian AR(1)-plus-noise model using the functions we have developed.

First we can simulate some synthetic data from this model, using a value of 0.8 for the auto-regression parameter:

val inNoise = Gaussian(0.0, 1.0).sample(99)
val state = DenseVector(inNoise.scanLeft(0.0)((s, i) => 0.8 * s + i).toArray)
val noise = DenseVector(Gaussian(0.0, 2.0).sample(100).toArray)
val data = (state + noise).toArray.toList

Now assuming that we don’t know the auto-regression parameter, we can construct a function to evaluate the likelihood of different parameter values as follows:

val mll = pfMll(
  (th: Double) => Gaussian(0.0, 10.0).sample(10000).toVector.par,
  (th: Double) => (s: Double) => Gaussian(th * s, 1.0).draw,
  (th: Double) => (s: Double, o: Double) => Gaussian(s, 2.0).logPdf(o),
  data
)

Note that the 4 characters “.par” at the end of line 2 are the only difference between running this code serially or in parallel! Now we can run this code by calling the returned function with different values. So, hopefully mll(0.8) will return a larger log-likelihood than (say) mll(0.6) or mll(0.9). The example code in the github repo plots the results of calling mll() for a range of values (note that if that was the genuine use-case, then it would be much better to parallellise the parameter range than the particle filter, due to providing better parallelisation granularity, but many other examples require parallelisation of the particle filter itself). In this particular example, both the forward model and the likelihood are very cheap operations, so there is little to be gained from parallelisation. Nevertheless, I still get a speedup of more than a factor of two using the parallel version on my laptop.

Conclusion

In this post we have shown how typeclasses can be used in Scala to write code that is parallelisation-agnostic. Code written in this way can be run on one or many cores as desired. We’ve illustrated the concept with a scalable particle filter, but nothing about the approach is specific to that application. It would be easy to build up a library of statistical routines this way, all of which can effectively exploit available parallel hardware. Further, although we haven’t demonstrated it here, it is trivial to extend this idea to allow code to be distribution over a cluster of parallel machines if necessary. For example, if an Apache Spark cluster is available, it is easy to make a Spark RDD instance for our generic collection typeclass, that will then allow us to run our (unmodified) particle filter code over a Spark cluster. This emphasises the fact that Spark can be useful for distributing computation as well as just processing “big data”. I’ll say more about Spark in subsequent posts.

Java math libraries and Monte Carlo simulation codes

Java libraries for (non-uniform) random number simulation

Anyone writing serious Monte Carlo (and MCMC) codes relies on having a very good and fast (uniform) random number generator and associated functions for generation of non-uniform random quantities, such as Gaussian, Poisson, Gamma, etc. In a previous post I showed how to write a simple Gibbs sampler in four different languages. In C (and C++) random number generation is easy for most scientists, as the (excellent) GNU Scientific Library (GSL) provides exactly what most people need. But it wasn’t always that way… I remember the days before the GSL, when it was necessary to hunt around on the net for bits of C code to implement different algorithms. Worse, it was often necessary to hunt around for a bit of free FORTRAN code, and compile that with an F77 compiler and figure out how to call it from C. Even in the early Alpha days of the GSL, coverage was patchy, and the API changed often. Bad old days… But those days are long gone, and C programmers no longer have to worry about the problem of random variate generation – they can safely concentrate on developing their interesting new algorithm, and leave the rest to the GSL. Unfortunately for Java programmers, there isn’t yet anything quite comparable to the GSL in Java world.

I pretty much ignored Java until Java 5. Before then, the language was too limited, and the compilers and JVMs were too primitive to really take seriously for numerical work. But since the launch of Java 5 I’ve been starting to pay more interest. The language is now a perfectly reasonable O-O language, and the compilers and JVMs are pretty good. On a lot of benchmarks, Java is really quite comparable to C/C++, and Java is nicer to code, and has a lot of impressive associated technology. So if there was a math library comparable to the GSL, I’d be quite tempted to jump ship to the Java world and start writing all of my Monte Carlo codes in Java. But there isn’t. At least not yet.

When I first started to take Java seriously, the only good math library with good support for non-uniform random number generation was COLT. COLT was, and still is, pretty good. The code is generally well-written, and fast, and the documentation for it is reasonable. However, the structure of the library is very idiosyncratic, the coverage is a bit patchy, and there doesn’t ever seem to have been a proper development community behind it. It seems very much to have been a one-man project, which has long since stagnated. Unsurprisingly then, COLT has been forked. There is now a Parallel COLT project. This project is continuing the development of COLT, adding new features that were missing from COLT, and, as the name suggests, adding concurrency support. Parallel COLT is also good, and is the main library I currently use for random number generation in Java. However, it has obviously inherited all of the idiosyncrasies that COLT had, and still doesn’t seem to have a large and active development community associated with it. There is no doubt that it is an incredibly useful software library, but it still doesn’t really compare to the GSL.

I have watched the emergence of the Apache Commons Math project with great interest (not to be confused with Uncommons Math – another one-man project). I think this project probably has the greatest potential for providing the Java community with their own GSL equivalent. The Commons project has a lot of momentum, the Commons Math project seems to have an active development community, and the structure of the library is more intuitive than that of (Parallel) COLT. However, it is early days, and the library still has patchy coverage and is a bit rough around the edges. It reminds me a lot of the GSL back in its Alpha days. I’d not bothered to even download it until recently, as the random number generation component didn’t include the generation of gamma random quantities – an absolutely essential requirement for me. However, I noticed recently that the latest release (2.2) did include gamma generation, so I decided to download it and try it out. It works, but the generation of gamma random quantities is very slow (around 50 times slower than Parallel COLT). This isn’t a fundamental design flaw of the whole library – generating Gaussian random quantities is quite comparable with other libraries. It’s just that an inversion method has been used for gamma generation. All efficient gamma generators use a neat rejection scheme. In case anyone would like to investigate for themselves, here is a complete program for gamma generation designed to be linked against Parallel COLT:

import java.util.*;
import cern.jet.random.tdouble.*;
import cern.jet.random.tdouble.engine.*;

class GammaPC
{

    public static void main(String[] arg)
    {
	DoubleRandomEngine rngEngine=new DoubleMersenneTwister();
	Gamma rngG=new Gamma(1.0,1.0,rngEngine);
	long N=10000;
	double x=0.0;
	for (int i=0;i<N;i++) {
	    for (int j=0;j<1000;j++) {
		x=rngG.nextDouble(3.0,25.0);
	    }
	    System.out.println(x);
	}
    }
    
}

and here is a complete program designed to be linked against Commons Math:

import java.util.*;
import org.apache.commons.math.*;
import org.apache.commons.math.random.*;

class GammaACM
{

    public static void main(String[] arg) throws MathException
    {
	RandomDataImpl rng=new RandomDataImpl();
	long N=10000;
	double x=0.0;
	for (int i=0;i<N;i++) {
	    for (int j=0;j<1000;j++) {
		x=rng.nextGamma(3.0,1.0/25.0);
	    }
	    System.out.println(x);
	}
    }
    
}

The two codes do the same thing (note that they parameterise the gamma distribution differently). Both programs work (they generate variates from the same, correct, distribution), and the Commons Math interface is slightly nicer, but the code is much slower to execute. I’m still optimistic that Commons Math will one day be Java’s GSL, but I’m not giving up on Parallel COLT (or C, for that matter!) just yet…