A scalable particle filter in Scala

Introduction

Many modern algorithms in computational Bayesian statistics have at their heart a particle filter or some other sequential Monte Carlo (SMC) procedure. In this blog I’ve discussed particle MCMC algorithms which use a particle filter in the inner-loop in order to compute a (noisy, unbiased) estimate of the marginal likelihood of the data. These algorithms are often very computationally intensive, either because the forward model used to propagate the particles is expensive, or because the likelihood associated with each particle/observation is expensive (or both). In this case it is desirable to parallelise the particle filter to run on all available cores of a machine, or in some cases, it would even be desirable to distribute the the particle filter computation across a cluster of machines.

Parallelisation is difficult when using the conventional imperative programming languages typically used in scientific and statistical computing, but is much easier using modern functional languages such as Scala. In fact, in languages such as Scala it is possible to describe algorithms at a higher level of abstraction, so that exactly the same algorithm can run in serial, run in parallel across all available cores on a single machine, or run in parallel across a cluster of machines, all without changing any code. Doing so renders parallelisation a non-issue. In this post I’ll talk through how to do this for a simple bootstrap particle filter, but the same principle applies for a large range of statistical computing algorithms.

Typeclasses and monadic collections

In the previous post I gave a quick introduction to the monad concept, and to monadic collections in particular. Many computational tasks in statistics can be accomplished using a sequence of operations on monadic collections. We would like to write code that is independent of any particular implementation of a monadic collection, so that we can switch to a different implementation without changing the code of our algorithm (for example, switching from a serial to a parallel collection). But in strongly typed languages we need to know at compile time that the collection we use has the methods that we require. Typeclasses provide a nice solution to this problem. I don’t want to get bogged down in a big discussion about Scala typeclasses here, but suffice to say that they describe a family of types conforming to a particular interface in an ad hoc loosely coupled way (they are said to provide ad hoc polymorphism). They are not the same as classes in traditional O-O languages, but they do solve a similar problem to the adaptor design pattern, in a much cleaner way. We can describe a simple typeclass for our monadic collection as follows:

trait GenericColl[C[_]] {
  def map[A, B](ca: C[A])(f: A => B): C[B]
  def reduce[A](ca: C[A])(f: (A, A) => A): A
  def flatMap[A, B, D[B] <: GenTraversable[B]](ca: C[A])(f: A => D[B]): C[B]
  def zip[A, B](ca: C[A])(cb: C[B]): C[(A, B)]
  def length[A](ca: C[A]): Int
}

In the typeclass we just list the methods that we expect our generic collection to provide, but do not say anything about how they are implemented. For example, we know that operations such as map and reduce can be executed in parallel, but this is a separate concern. We can now write code that can be used for any collection conforming to the requirements of this typeclass. The full code for this example is provided in the associated github repo for this blog, and includes the obvious syntax for this typeclass, and typeclass instances for the Scala collections Vector and ParVector, that we will exploit later in the example.

SIR step for a bootstrap filter

We can now write some code for a single observation update of a bootstrap particle filter.

def update[S: State, O: Observation, C[_]: GenericColl](
  dataLik: (S, O) => LogLik, stepFun: S => S
)(x: C[S], o: O): (LogLik, C[S]) = {
  val xp = x map (stepFun(_))
  val lw = xp map (dataLik(_, o))
  val max = lw reduce (math.max(_, _))
  val rw = lw map (lwi => math.exp(lwi - max))
  val srw = rw reduce (_ + _)
  val l = rw.length
  val z = rw zip xp
  val rx = z flatMap (p => Vector.fill(Poisson(p._1 * l / srw).draw)(p._2))
  (max + math.log(srw / l), rx)
}

This is a very simple bootstrap filter, using Poisson resampling for simplicity and data locality, but does include use of the log-sum-exp trick to prevent over/underflow of raw weight calculations, and tracks the marginal (log-)likelihood of the observation. With this function we can now pass in a “prior” particle distribution in any collection conforming to our typeclass, together with a propagator function, an observation (log-)likelihood, and an observation, and it will return back a new collection of particles of exactly the same type that was provided for input. Note that all of the operations we require can be accomplished with the standard monadic collection operations declared in our typeclass.

Filtering as a functional fold

Once we have a function for executing one step of a particle filter, we can produce a function for particle filtering as a functional fold over a sequence of observations:

def pFilter[S: State, O: Observation, C[_]: GenericColl, D[O] <: GenTraversable[O]](
  x0: C[S], data: D[O], dataLik: (S, O) => LogLik, stepFun: S => S
): (LogLik, C[S]) = {
  val updater = update[S, O, C](dataLik, stepFun) _
  data.foldLeft((0.0, x0))((prev, o) => {
    val next = updater(prev._2, o)
    (prev._1 + next._1, next._2)
  })
}

Folding data structures is a fundamental concept in functional programming, and is exactly what is required for any kind of filtering problem. Note that Brian Beckman has recently written a series of articles on Kalman filtering as a functional fold.

Marginal likelihoods and parameter estimation

So far we haven’t said anything about parameters or parameter estimation, but this is appropriate, since parametrisation is a separate concern from filtering. However, once we have a function for particle filtering, we can produce a function concerned with evaluating marginal likelihoods trivially:

def pfMll[S: State, P: Parameter, O: Observation, 
            C[_]: GenericColl, D[O] <: GenTraversable[O]](
  simX0: P => C[S], stepFun: P => S => S, 
  dataLik: P => (S, O) => LogLik, data: D[O]
): (P => LogLik) = (th: P) => 
       pFilter(simX0(th), data, dataLik(th), stepFun(th))._1

Note that this higher-order function does not return a value, but instead a function which will accept a parameter as input and return a (log-)likelihood as output. This can then be used for parameter estimation purposes, perhaps being used in a PMMH pMCMC algorithm, or something else. Again, this is a separate concern.

Example

Here I’ll just give a completely trivial toy example, purely to show how the functions work. For avoidance of doubt, I know that there are many better/simpler/easier ways to tackle this problem! Here we will just look at inferring the auto-regression parameter of a linear Gaussian AR(1)-plus-noise model using the functions we have developed.

First we can simulate some synthetic data from this model, using a value of 0.8 for the auto-regression parameter:

val inNoise = Gaussian(0.0, 1.0).sample(99)
val state = DenseVector(inNoise.scanLeft(0.0)((s, i) => 0.8 * s + i).toArray)
val noise = DenseVector(Gaussian(0.0, 2.0).sample(100).toArray)
val data = (state + noise).toArray.toList

Now assuming that we don’t know the auto-regression parameter, we can construct a function to evaluate the likelihood of different parameter values as follows:

val mll = pfMll(
  (th: Double) => Gaussian(0.0, 10.0).sample(10000).toVector.par,
  (th: Double) => (s: Double) => Gaussian(th * s, 1.0).draw,
  (th: Double) => (s: Double, o: Double) => Gaussian(s, 2.0).logPdf(o),
  data
)

Note that the 4 characters “.par” at the end of line 2 are the only difference between running this code serially or in parallel! Now we can run this code by calling the returned function with different values. So, hopefully mll(0.8) will return a larger log-likelihood than (say) mll(0.6) or mll(0.9). The example code in the github repo plots the results of calling mll() for a range of values (note that if that was the genuine use-case, then it would be much better to parallellise the parameter range than the particle filter, due to providing better parallelisation granularity, but many other examples require parallelisation of the particle filter itself). In this particular example, both the forward model and the likelihood are very cheap operations, so there is little to be gained from parallelisation. Nevertheless, I still get a speedup of more than a factor of two using the parallel version on my laptop.

Conclusion

In this post we have shown how typeclasses can be used in Scala to write code that is parallelisation-agnostic. Code written in this way can be run on one or many cores as desired. We’ve illustrated the concept with a scalable particle filter, but nothing about the approach is specific to that application. It would be easy to build up a library of statistical routines this way, all of which can effectively exploit available parallel hardware. Further, although we haven’t demonstrated it here, it is trivial to extend this idea to allow code to be distribution over a cluster of parallel machines if necessary. For example, if an Apache Spark cluster is available, it is easy to make a Spark RDD instance for our generic collection typeclass, that will then allow us to run our (unmodified) particle filter code over a Spark cluster. This emphasises the fact that Spark can be useful for distributing computation as well as just processing “big data”. I’ll say more about Spark in subsequent posts.

Published by

darrenjw

I am Professor of Statistics within the Department of Mathematical Sciences at Durham University, UK. I am an Bayesian statistician interested in computation and applications, especially to engineering and the life sciences.

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.