Pipeline execution in HSC

Some more details on the plan moving forward:

Here’s how things currently work on the HSC side. The following stages are run in series:

  1. reduceFrames: for each exposure, we process each CCD through the MPI process pool, gather the match lists and use this to construct an exposure-wide (“global”) astrometric solution and focus solution which are then sent to the worker nodes to be attached to the CCDs before writing out. Note that this is inefficient because some CCDs are processed faster than others (e.g., the number of operations for focus CCDs is smaller than for science CCDs), but all processes block on receiving the results of the gathered operation. Parallelisation beyond 112 cores is done by launching multiple jobs with the cluster’s scheduler (e.g., dividing the data up by filter and/or survey region).
  2. mosaic: for each tract, we calculate the mosaic solution. We use the python ‘multiprocessing’ module (for parallelisation on a single node) to read the inputs, and the mosaic solution is calculated with multi-threaded code (e.g., OMP). Parallelisation beyond single nodes is done by launching multiple jobs with the cluster’s scheduler (dividing the data by tract and filter).
  3. stack: for each tract, we process each patch through the MPI process pool which involves warping all inputs, coadding the warps and then detecting sources. This is inefficient because patches within a tract are independent (there is no useful gathering of results across patches), so cores for patches that process quickly twiddle their thumbs until the other patches are done. Also, there are synchronisation points (fairly needlessly, I think) between warping and coadding that compounds the inefficiency. Oh, I forgot to mention: before the processing of each patch, there is a scatter/gather for reading the WCS of each input exposure; I believe this is what causes us to trash GPFS.
  4. multiband: for each tract, we merge detections for each patch, measure detections for each patch+filter, merge measurements for each patch, and do forced measurement for each patch+filter. This is a bit more efficient in some respects than for stack because of the use of different parallelisation levels (patch vs patch+filter), but it’s still inefficient because there are usually some patch+filters for which measurement (and forced photometry) takes longer than the others, forcing everything else to wait. And occasionally we run out of memory in a single patch+filter, and Slurm brings down the job for the entire tract. So last time after I ran all the tracts, I figured out which tracts failed and ran the patches for those independently.

On the LSST side, everything is mostly the same, except with different names:

  1. reduceFrames.py --> singleFrameDriver.py
  2. mosaic: port in progress
  3. stack.py --> coaddDriver.py
  4. multiband.py --> multiBandDriver.py

The singleFrameDriver.py work hasn’t quite landed on master yet, but we’ve agreed to remove the global astrometric solution (we didn’t discuss the focus solution, but we can move all of that out into its own job). With that change, we won’t have any scatter/gather of substance (in the current pipeline; this must change), so we can change the parallelisation level:

  1. singleFrameDriver: process each CCD independently. Because we’re not running single exposures in series, we can make this as wide as we like, e.g., a thousand cores. I also propose to gather all the Wcs results, and store the bounding box for each CCD in a sqlite database, for use in the stacking to reduce the GPFS hammering.
  2. mosaic: I think this must continue to run as it is. I have no idea how meas_jointcal will operate, but I don’t expect it will be very different.
  3. coaddDriver: parallelise over patch+exposures to create warps, and then over patches to coadd. Because we’re not running tracts in series or separately, we can again make this as wide as we like. This was the big take-home lesson for me from Yasuda-san’s friends at Tsukuba.
  4. multibandDriver: same parallelisation scheme, but removing the need to do a tract at a time; then we can again make this as wide as we like.

I think that will greatly help address the inefficiencies, but it does so through the removal of our ability to do scatter-gather-scatter workflows. But we know that we’ll need that capability again in the future, but we don’t want to go back to inefficiencies of the current state. I think Spark- or Dataflow-like framework would help with that, or we can somehow subdivide our MPI processing Pool. The key requirement is that we’re going to need to be able to do scatter/gather (of python code on python variables returning python variables) over subsets of what we’re processing.