The principal requirements are to enable parallelisation at a higher level than what we’ve been doing up until now (e.g., over exposures instead of over CCDs in an exposure; or over tracts instead of over patches within a tract), while preserving our ability to do python scatter/gather operations at the same level (or even lower levels) as currently. That will allow us to overcome the current inefficiencies in the pipeline, and by having the data available from a master process we can pass the right data where it needs to go which overcomes the problem with reading small files.
I’ve spent a couple of days looking at Apache Spark, and got to the point of being able to run an example script (pi from Monte Carlo). @ctslater pointed out that Google has just open-sourced a python parallelisation framework for their cloud service, Dataflow, and I spent a day or so looking at the code to get a feel for that. These two frameworks appear to be different means of doing the same thing, which is to go beyond Hadoop MapReduce, achieving optimisations up to a factor of 100 or so compared to MapReduce because of the ability to hold data in memory. Both frameworks support “streaming” mode, where data is being delivered to the nodes in real time (e.g., Twitter posts and other web-based inputs), which seems to be the latest big trend. Another similar framework is Apache Flink, but I’ve not looked at that since it only supports Scala or Java. There are also other python pipeline frameworks, but they are typically only glorified Make clones (solve the DAG, run external commands), while I think it’s essential that we run python functions and manipulate python data.
Spark looks like it will do what we need, but I haven’t gotten to the point of doing anything serious with it yet. I have a few concerns with it, however. Spark is written in Scala, run in a Java Virtual Machine, with a Python front-end. This is an unfamiliar environment which makes it difficult to debug, and you have to worry about details that you wouldn’t otherwise (e.g., JVM memory usage). It also requires Java be installed and available on the cluster. I’m not as worried about Maven as Mineo-san (we’re not installing as root), but it certainly downloads an awful lot of packages as part of the install, so the whole feels rather heavy, having far more than we need or care about, which I think must make for maintenance problems down the track. There’s no facility for launching workers on the cluster; for Slurm, we can use srun (though the official script doesn’t seem to work, and I can’t figure out why), but for PBS we would have to use ssh or pull out the PBS remote execution code from mpich/hydra. I asked a question on the users’ forum, but got no response, which is discouraging. But apart from these concerns, I think Spark is something we could use.
The Google DataflowPythonSDK has the attraction of being written purely in python, so it’s code we can read and follow. The big problem here is that there is no facility for running on a cluster — there are “runners” only for a single thread on a single machine, and for the Google’s cloud service. I’m told that other people are developing runners for other scenarios (including running Dataflow under Spark). I’ve been in touch with one of the DataflowPythonSDK developers to try to get a sense of what it would take to adapt our MPI process pool to do the job, and while I think it’s possible and it would be a lot of fun, I fear it would take some work (the hard part is the communication between nodes, which I think would require using multiprocessing to do the heavy lifting, while keeping a process open to communicate). This was generally confirmed by my contact. I quoted an e-mail by @RHL to the HSC list (“I really don’t want to be inventing our own batch system, or modifying someone else’s if we can possibly help it”), and he replied, “Basically I think your boss is a smart person :-)”.
Here’s my proposal for moving forward. At the present time, the scatter/gather capability of our MPI process pool isn’t really used anywhere (the exception on the HSC side is reduceFrames, where we do a global astrometric solution, but that will be going away). While we know that we’ll need this capability for future features (e.g., background matching, maybe multifit, maybe inter-CCD crosstalk), we’re not really making use of it at the moment. We should therefore be able to change the parallelisation level without affecting our current capabilities. And the changes that we need to make in order to support that (e.g., refactoring the warping and stacking to use a database of CCD corners, and writing that database from reduceFrames) are changes that we would have to make in order to use Spark or Dataflow, so there’s a stepping stone mid-stream that allows us to push forward in the direction we need to go, while waiting to see how these external tools develop before we make the Big Decision. In the mean time, I hope the Middleware team can evaluate these promising tools and perhaps consider adopting something.