Pipeline execution in HSC

Tags: #<Tag:0x00007fb37d3f4598>

By “single process”, do you mean “posix process” (I’m not familiar enough with how Spark works to know whether it’ll use multiple threads or processes)? If so, we need to worry about the GIL, and a disturbing report from @rearmstr that just trying to run our code after compiling with support for multiple threads causes mysterious segfaults at program exit. Obviously that’s not a good state of affairs, and we need to fix it - but I’m concerned that any solution involving multithreading may require a lot of work.

That aside, I discussed our needs for parallelization a few weeks ago with Rob Kooper at NCSA, and I think he’s the best person to weigh in on this (I don’t think he has an account here yet, but perhaps @mgelman2 can make sure he sees this thread?). I’m hoping a choice at the level of whether to use Spark would be abstracted away from most of the Science Pipelines developers, but of course that does matter to the extent we also need to set up the infrastructure that would use it for HSC processing.

In the shorter term, I wonder if it’d be possible use some of the dagman work that I know almost nothing about but associate (possibly inaccurately) with @srp and @daues to handle higher-level, large-scale control flow for ctrl_pool tasks?

I expect PySpark uses multiple processes on a single machine to avoid the GIL, but I don’t know the details, only that the GIL is apparently not a problem.

By “single process”, I mean bringing reduceFrames, mosaic, stack and multiband together into a single monolith. I want to only have to run a single command to start the pipeline running all the way through. I have been running multiple discrete commands (one for each stage+data combination), and having to time the submission to slurm so they don’t all fire at once, and (should be) checking the logs for errors after each stage completes; but all that is terribly inconvenient and heavy on personnel.

@jbosch, @rearmstr – do we have a ticket for the “disturbing report” of segfaults? If not, and even if it might be nothing, can we make one so that future readers of this discussion can easily find out what the resolution was?

Beyond that, I certainly agree with the broad thrust of Jim’s message: NCSA is the home of expertise within LSST on this, and we should certainly solicit their input. Formally, we are not budgeted to expend Science Pipelines effort on this beyond the level of establishing requirements and communicating them to the middleware developers.

Of course, it’s likely in LSST’s interest to support HSC. @price, can you provide an indication of the extent to which this is a significant issue from the HSC side? Of course, I’m all in favour of doing things better where we can, but I did not get the impression that struggling with middleware issues was the major blocker for the recent HSC data release. Was I wrong? Do you anticipate it being different next time?

For the next production run we will have even more HSC data to process than for the last run (by a factor of two?), and I am very concerned about the burden on personnel and machines required in running the production. The problem with hammering the GPFS disk is very serious and is already impacting our ability to process data now (the tech gurus told me to back off just the other day). This is, of course, an HSC issue, but given that LSST has similar problems I was hoping that this is something we might be able to address in a way that both projects will gain. The whole point of this thread is to solicit the advice of those within LSST with expertise in this kind of thing in the hope that what we produce for HSC will be useful for LSST, and I hope the experts will weigh in soon.

@jdswinbank, I have not filed a ticket on the segfault issue. I will try to reproduce it using the LSST software and file one.

The dagman work was pretty straight-forward. In ctrl_execute we filled in templates for jobs and ultimately ended up in a short, very wide DAG. It’s been a while since I looked at how we called the DAG generation script but it should be relatively straightforward to substitute a custom script to generate the DAG you’re interested in, and then get it submitted to HTCondor. However, that custom script is the tricky part. I’d have to understand a little more about the requirements to be able to give advice on how to proceed.

I’m grateful to @daues for taking the time to chat with me about HTCondor and DAGman, and helping me get the requirements clearer in my head.

I see that we are missing in our current workflows some recording of what operations failed and which succeeded; others (e.g., @nidever et al.) have noticed the same. If we could record the bounding box for each CCD using the same mechanism, that would allow us to deal with the big problem of hammering the storage while stacking. I think that we can deal with this in one of two ways:

  1. A database to track the processing state. I am familiar with this solution because it’s what’s used by the Pan-STARRS IPP that I helped write. Each job that runs records its success or failure in the database, along with metadata that might help indicate the quality of the data. Pan-STARRS uses a job scheduler that regularly queries the database to figure out what needs to run. We could use DAGman for that, but in order to know what failed we need something that pulls results from DAGman/HTCondor and populates our own database.

  2. A single process that runs the entire production. Here, the state database is replaced by variables in memory (potentially shared across nodes) that we can persist. The process knows the state, and can manage what gets run when, essentially following a DAG.

My interest in Apache Spark is due to the observation that it could be used for the second option, while also allowing us to address the inefficiencies in ctrl_pool. I don’t think that we should make ctrl_pool (or, at least, ctrl_pool in the manner in which we have been using it to parallelise over CCDs in a exposure or patches in a tract at something near 50% efficiency) part of the plan for moving forward. Using DAGman (with or without the Pegasus to write DAGs in python) and HTCondor might help with doing things how they are being done now, but I think we need to get beyond where we’re at now.

I hope @ktl or @gpdf might have some high-level view on these kind of problems and share how they envisage solving them?

Would DM-3472 also address this problem? I think that may be landing in the not-too-distant future.

Yes, although I’m also waiting for Rob Kooper to advise, so I’m not quite ready to share.

Some considerations, though:

The problem with a single process is that it’s often difficult to do partial restarts on failure; the baseline design as a result presumes a database (including a ScienceCcdExposure table with coordinates for each successfully processed CCD).

Apache Spark could be a major change in thinking in a couple of ways: it could potentially descend deeper into the algorithms, rather than being just a top-level control solution, and in the other direction, by assembling pipelines and productions as Python scripts, it could make more difficult some of the things that SuperTask and SQuaSH are aiming for.

@price Thanks for sharing this information, that I find particularly interesting for understanding what infrastructure we need to build for LSST data reduction.

While doing some tests here at CC-IN2P3, we have encountered some issues using GPFS that I have reported elsewhere. I don’t know if what you observed is similar, but it in our particular case the limiting factor seems to be not only GPFS but the also file format the data is recorded in (FITS in our case).

Regarding Apache Spark, we have done some very preliminary exercises mainly targeted at understanding this framework. We developed a small application using Astropy to scan the HDUs of 9 million FITS files from the CFHT. The collected FITS metadata was then stored in both a relational database and in a MongoDB store.

In our particular case, Spark was used as a scheduler of tasks: each task was responsible for scanning one FITS file. The list of FITS files to scan was an input to the application. The main strength of Spark is that it transparently caches the data files to be processed by the applications in memory (and possibly disk local to the compute node where the application runs) and the scheduler uses that information to execute the application in the same compute node to exploit data locality. In order to benefit of Spark, you would need to inject the input data your application needs to process (i.e. the FITS files) in a format that Spark can understand (so that it knows how to partition ink which may imply some additional work and given the scale of LSST this does not seem practical.

Thanks again for sharing your experience using LSST software for a data processing campaign.

Thanks Fabio.

My understanding is that GPFS has problems with small reads and writes, and FITS headers qualify as “small”. I’m therefore a little worried about the plans for creating a database from the processed data using an afterburner (DM-3472); I think it needs to be done as part of processing, while the information is already in memory.

I don’t think there’s a need to inject the FITS files into Spark, just the dataIds. I think the I/O can be done in the same way we have been doing with ctrl_pool. I would try to keep the datasets seen by Spark small to minimise inefficiencies from transfer.

@ktl expressed concern about “partial restarts on failure”. Spark appears to address this up-front: it calls its principal abstraction “resiliant distributed datasets”, because they can be cached in memory or on disk, and recreated in the event of failure.

It would be great if you could share this somewhere, since I’m sure it would be similarly helpful for the rest of us trying to understand Spark.

Hi Colin,

thanks for your interest. You can find the slides of two short internal presentations made by Osman Aidel, the author of this work, in chronological order:

  • [2015-10-13] Using Spark to drive the metadata extraction from the HDUs of 9M FITS files and populating a MySQL database [link]
  • [2016-01-12] The same exercise, but this time populating a MongoDB database [link]

This was done in an evaluation deployment of Spark using virtual machines.

Osman would be happy to give you more details. He does not have an account in this site but can be reached by e-mail: oaidel at cc.in2p3.fr.

Hope this helps.

So we are ramping op on the Batch system now that we have L1 understood.
Here is an update of items that seem relevant.

We’ve used shared nothing in DES and have some issues with some use cases that are not discussed here. We want to look at Jim B’s recently produced flows for APR in light of what we learned in DES. We discussed this today when beginning to work on a concept of operations (and need to being CCIN2P3 into the loop on this)

We are aware that local file systems are likely to to have a performance improvements that are greater than global file systems in the next few years.

We’ve know about GPFS and picking data from headers for several years,
that’s old hat. We do not understand the role of headers in in the LSST production system, and will make an assumption about its importance if no further information is coming.

Current, but not committed, thinking is that the archival part of the data backbone will NOT be used for direct program access, but that thinking is tentative.

Thinking how well the stack will run at places like NERSC – for DESC is on the agenda. I do not see shared nothing on their roadmap, and we are seeing presentations where HPC machines will have levels of caching between global file systems and compute nodes.

That is all.

I had a lunch conversation today with Michael Schneider, who mentioned that someone in his group (Karen Ng at UC Davis) spent a summer trying to use PySpark and essentially learned that the Python bindings were not nearly as complete as the Scala bindings, and to do what she wanted (which may be quite different from what we want to do) the only option was to us Scala. And it was also inconvenient that using Spark required a cluster actually configured to use it - it’s not something you can run on any generic batch system.

The question of which external tools to use is really up to NCSA, of course, but I thought this anecdote might be useful.

Looking at the docs, I get the sense that the python support has improved with recent versions. The current docs put Scala and Python examples side-by-side (you can display the language of your choice), so I get the impression that Python support is important to them.

Provided the batch system gives you the ability to do adequate networking, I think you can run Spark “under” the batch system. Using it to control the batch system is not so likely, in my initial poking around.

Right. I think there would still be a place for HTCondor Glidein if we were running Spark.

The principal requirements are to enable parallelisation at a higher level than what we’ve been doing up until now (e.g., over exposures instead of over CCDs in an exposure; or over tracts instead of over patches within a tract), while preserving our ability to do python scatter/gather operations at the same level (or even lower levels) as currently. That will allow us to overcome the current inefficiencies in the pipeline, and by having the data available from a master process we can pass the right data where it needs to go which overcomes the problem with reading small files.

I’ve spent a couple of days looking at Apache Spark, and got to the point of being able to run an example script (pi from Monte Carlo). @ctslater pointed out that Google has just open-sourced a python parallelisation framework for their cloud service, Dataflow, and I spent a day or so looking at the code to get a feel for that. These two frameworks appear to be different means of doing the same thing, which is to go beyond Hadoop MapReduce, achieving optimisations up to a factor of 100 or so compared to MapReduce because of the ability to hold data in memory. Both frameworks support “streaming” mode, where data is being delivered to the nodes in real time (e.g., Twitter posts and other web-based inputs), which seems to be the latest big trend. Another similar framework is Apache Flink, but I’ve not looked at that since it only supports Scala or Java. There are also other python pipeline frameworks, but they are typically only glorified Make clones (solve the DAG, run external commands), while I think it’s essential that we run python functions and manipulate python data.

Spark looks like it will do what we need, but I haven’t gotten to the point of doing anything serious with it yet. I have a few concerns with it, however. Spark is written in Scala, run in a Java Virtual Machine, with a Python front-end. This is an unfamiliar environment which makes it difficult to debug, and you have to worry about details that you wouldn’t otherwise (e.g., JVM memory usage). It also requires Java be installed and available on the cluster. I’m not as worried about Maven as Mineo-san (we’re not installing as root), but it certainly downloads an awful lot of packages as part of the install, so the whole feels rather heavy, having far more than we need or care about, which I think must make for maintenance problems down the track. There’s no facility for launching workers on the cluster; for Slurm, we can use srun (though the official script doesn’t seem to work, and I can’t figure out why), but for PBS we would have to use ssh or pull out the PBS remote execution code from mpich/hydra. I asked a question on the users’ forum, but got no response, which is discouraging. But apart from these concerns, I think Spark is something we could use.

The Google DataflowPythonSDK has the attraction of being written purely in python, so it’s code we can read and follow. The big problem here is that there is no facility for running on a cluster — there are “runners” only for a single thread on a single machine, and for the Google’s cloud service. I’m told that other people are developing runners for other scenarios (including running Dataflow under Spark). I’ve been in touch with one of the DataflowPythonSDK developers to try to get a sense of what it would take to adapt our MPI process pool to do the job, and while I think it’s possible and it would be a lot of fun, I fear it would take some work (the hard part is the communication between nodes, which I think would require using multiprocessing to do the heavy lifting, while keeping a process open to communicate). This was generally confirmed by my contact. I quoted an e-mail by @RHL to the HSC list (“I really don’t want to be inventing our own batch system, or modifying someone else’s if we can possibly help it”), and he replied, “Basically I think your boss is a smart person :-)”.

Here’s my proposal for moving forward. At the present time, the scatter/gather capability of our MPI process pool isn’t really used anywhere (the exception on the HSC side is reduceFrames, where we do a global astrometric solution, but that will be going away). While we know that we’ll need this capability for future features (e.g., background matching, maybe multifit, maybe inter-CCD crosstalk), we’re not really making use of it at the moment. We should therefore be able to change the parallelisation level without affecting our current capabilities. And the changes that we need to make in order to support that (e.g., refactoring the warping and stacking to use a database of CCD corners, and writing that database from reduceFrames) are changes that we would have to make in order to use Spark or Dataflow, so there’s a stepping stone mid-stream that allows us to push forward in the direction we need to go, while waiting to see how these external tools develop before we make the Big Decision. In the mean time, I hope the Middleware team can evaluate these promising tools and perhaps consider adopting something.

Some more details on the plan moving forward:

Here’s how things currently work on the HSC side. The following stages are run in series:

  1. reduceFrames: for each exposure, we process each CCD through the MPI process pool, gather the match lists and use this to construct an exposure-wide (“global”) astrometric solution and focus solution which are then sent to the worker nodes to be attached to the CCDs before writing out. Note that this is inefficient because some CCDs are processed faster than others (e.g., the number of operations for focus CCDs is smaller than for science CCDs), but all processes block on receiving the results of the gathered operation. Parallelisation beyond 112 cores is done by launching multiple jobs with the cluster’s scheduler (e.g., dividing the data up by filter and/or survey region).
  2. mosaic: for each tract, we calculate the mosaic solution. We use the python ‘multiprocessing’ module (for parallelisation on a single node) to read the inputs, and the mosaic solution is calculated with multi-threaded code (e.g., OMP). Parallelisation beyond single nodes is done by launching multiple jobs with the cluster’s scheduler (dividing the data by tract and filter).
  3. stack: for each tract, we process each patch through the MPI process pool which involves warping all inputs, coadding the warps and then detecting sources. This is inefficient because patches within a tract are independent (there is no useful gathering of results across patches), so cores for patches that process quickly twiddle their thumbs until the other patches are done. Also, there are synchronisation points (fairly needlessly, I think) between warping and coadding that compounds the inefficiency. Oh, I forgot to mention: before the processing of each patch, there is a scatter/gather for reading the WCS of each input exposure; I believe this is what causes us to trash GPFS.
  4. multiband: for each tract, we merge detections for each patch, measure detections for each patch+filter, merge measurements for each patch, and do forced measurement for each patch+filter. This is a bit more efficient in some respects than for stack because of the use of different parallelisation levels (patch vs patch+filter), but it’s still inefficient because there are usually some patch+filters for which measurement (and forced photometry) takes longer than the others, forcing everything else to wait. And occasionally we run out of memory in a single patch+filter, and Slurm brings down the job for the entire tract. So last time after I ran all the tracts, I figured out which tracts failed and ran the patches for those independently.

On the LSST side, everything is mostly the same, except with different names:

  1. reduceFrames.py --> singleFrameDriver.py
  2. mosaic: port in progress
  3. stack.py --> coaddDriver.py
  4. multiband.py --> multiBandDriver.py

The singleFrameDriver.py work hasn’t quite landed on master yet, but we’ve agreed to remove the global astrometric solution (we didn’t discuss the focus solution, but we can move all of that out into its own job). With that change, we won’t have any scatter/gather of substance (in the current pipeline; this must change), so we can change the parallelisation level:

  1. singleFrameDriver: process each CCD independently. Because we’re not running single exposures in series, we can make this as wide as we like, e.g., a thousand cores. I also propose to gather all the Wcs results, and store the bounding box for each CCD in a sqlite database, for use in the stacking to reduce the GPFS hammering.
  2. mosaic: I think this must continue to run as it is. I have no idea how meas_jointcal will operate, but I don’t expect it will be very different.
  3. coaddDriver: parallelise over patch+exposures to create warps, and then over patches to coadd. Because we’re not running tracts in series or separately, we can again make this as wide as we like. This was the big take-home lesson for me from Yasuda-san’s friends at Tsukuba.
  4. multibandDriver: same parallelisation scheme, but removing the need to do a tract at a time; then we can again make this as wide as we like.

I think that will greatly help address the inefficiencies, but it does so through the removal of our ability to do scatter-gather-scatter workflows. But we know that we’ll need that capability again in the future, but we don’t want to go back to inefficiencies of the current state. I think Spark- or Dataflow-like framework would help with that, or we can somehow subdivide our MPI processing Pool. The key requirement is that we’re going to need to be able to do scatter/gather (of python code on python variables returning python variables) over subsets of what we’re processing.