DRP Parallelization Needs

I’ve finished my first attempt to describe DRP’s parallelization needs here:


I’ve asked @price to do the formal ticket review (DM-5715), but I’d also be interested in a few specific opinions:

  • @ktl, @gpdf, @robkooper : What needs to be described in more detail, and how could I present what I do have better?
  • @rhl : Do you agree with where I’ve drawn the line between DAG-driven processing and algorithm-level code? Are there any algorithms you think will require parallelization patterns other than the ones I’ve described?

Comments and questions from everyone else are of course quite welcome as well.

At the Python level, we’re dealing with high-level algorithms that we don’t want to logically split up and stitch back together using the orchestration layer; it’s important for the readability and maintainability of the code that we express the parallelization in the algorithm implementation itself.

(Emphasis mine.) This is well expressed, and I think it should be put down as a requirement. This is what I’ve attempted to communicate before by saying that we need to be passing python variables to python functions and receiving back python variables: the parallelisation must be integral to the algorithm implementation (in python). We can’t have this written as separate shell commands. I would like to see this written as something more like:

gatheredData = task.processScatter(scatteredData)
newScatterData = task.crunchGathered(gatheredData)

rather than the decorator magic suggested in the document (under Sensors in Visit; but note that Jim explicitly said the examples he’s given aren’t to be taken as suggesting a design) or by creating a DAG object and adding nodes to it. The code should, as much as possible, express the writer’s intent rather than the implementation details.

I don’t think I agree with the position of the boundary between the orchestration layer and the algorithms layer. I think that the boundary needs to be higher (perhaps even at the tippy-top). It’s not very clear, but the idea expressed seems to be much like we have done with reduceFrames.py in HSC where the algorithms layer is allocated by the orchestration layer so many cores for its scatter/gather operations on one element (e.g., visit or tract), but I have argued that this is inefficient – some pieces end up waiting for others, and because they’re tied to cores those cores sit idle. I would like to see us use some system where a core that is waiting for input be used for some other process. But that means that the algorithm layer needs to operate on multiple elements (e.g., visits, tracts) at a time, rather than just one.

A very big open question is whether this mode of parallelization would need to control work beyond a single node (I don’t think it does).

I was surprised by this statement, but perhaps I just don’t know what kind of memory you’re expecting nodes to have. I would have thought that 189 CCDs is going to be a bit much to keep in memory on a single node: the images are “only” 30 GB (10 B/pix), but I think catalogs, work space and inefficiencies will mean you’re going to need at least a few times that. Maybe that’s small potatoes in a few years?

Another question is how effectively we can use true threading in Python in the presence of the Global Interpreter Lock; if we end up spending most of our time in C++ anyway we should be able to release the GIL at the language boundary.

This statement surprised and scared me. We’ve spent the last few years pulling things from C++ into python, and now everyone is in the mode of writing stuff in python whenever it’s feasible, and C++ only when necessary. I think that’s the right thing to do, because it speeds development. But the GIL means that threading is only worth doing for operations that involve a lot of blocking (network I/O is the classic example), and I don’t think we spend nearly enough time in C++ to put us in that regime. For example, I recently cleaned up a few of the dependencies in ci_hsc so we could get some more parallelism, but the parts I managed to make more parallel were all python code (reading data from the butler) and SCons parallelises with threads, and so I ended up getting the process up to something like 150% CPU usage on top, while running with 8 threads. Maybe we haven’t told SWIG to give up the GIL and so my example doesn’t really count, but if we want to go this way we need to lean back towards putting more functionality in C++, which scares me. But, it shouldn’t really be a concern because processes are just as easy to use as threads (and I see concurrent.futures is going to make them interchangeable). So let’s not even mention python threading at all, OK?

At the C++ level, I think we’ll at least occasionally want to use OpenMP to utilize multiple threads in C++; how much may depend on the hardware architecture and memory/CPU.

This really scares me. I’ve heard that we’re expecting the memory-to-CPU ratio to be such as to encourage the use of low-level parallelisation (like OpenMP) so that we need to get multiple cores working on the same data in memory. If you want to use low-level parallelisation and you want to use your cores efficiently, then you need to have that low-level parallelisation built into everything. If the low-level parallelisation is not built into everything, then you either need to let cores sit idle while you’re not doing low-level parallelisation, or you load up those cores using higher-level parallelisation, contrary to the operational model. So either we need to adopt the operational model of using low-level parallelisation everywhere, or we need to get out of thinking that we’re going to be operating with a low memory-to-CPU ratio (and reflect that thinking in our cluster node design and by dropping talk about OpenMP). There may be some operations in our pipeline where the data forces us into a low memory-to-CPU ratio and it’s worth always using OpenMP (e.g., jointcal; but I’m basing this on meas_mosaic which is an awful memory hog and maybe jointcal does better), but I don’t see how we can build low-level parallelism into much else in a complete fashion.

1 Like
gatheredData = task.processScatter(scatteredData)
newScatterData = task.crunchGathered(gatheredData)

This actually looks like it’s entirely consistent the decorator I proposed; I think at least one of these methods has to be thusly-decorated (or something equivalent) in order to actually run in parallel or do any gather/scatter. Are you just proposing that we include verbs indicating parallelization actions in the method names? I wouldn’t necessarily have a problem with that, though I think it’s important that the method names also describe the algorithmic steps they’re taking.

Yes, I was imagining something like this, but with the number of jobs smaller than the number of core-groups, so they wouldn’t all be executing in parallel and hence you could do some load balancing. I think that ought to make a big difference.

I’m surprised to hear you say that you think the orchestration level is uniformly too low - I was attempting to draw it at the level our CmdLineTasks are at now, thinking that anything we’re willing to string together by hand we’d be willing to let he DAG string together instead. And while I’ve split up some things (e.g. warping and coaddition) we’ve combined into the BatchPoolTask drivers, I based that on your preference for Spark (etc) as a better way to manage the dependencies than the explicit parallelization calls in our drivers can do.

Are we just disagreeing because we have different expectations on how the we set up the orchestration layer’s DAG, or are there places where you think the high-level algorithm is complex enough that we want to write it as code rather than a DAG submission?

For the big clusters, I think it will be small potatoes, but I also can’t imagine us going beyond a factor of two of the image data size (at least for single-frame processing, where blending is low) unless we’re extremely wasteful. Maybe that’s optimistic, though.

This is part of why I’ve been a little hesitant to jump on the “Python first; C++ only when needed” bandwagon. (The other part is that I think very complex systems are sometimes best implemented in a stricter language, but I don’t want this thread to turn into a flame war).

I do think a lot of the problem is that we’re not telling Swig to release the GIL right now; until we do that, no experiments with threading will be at all meaningful.

And if we’re not spending most of our time in C++ (or other C-implemented calls, like Numpy operations), then I’d say we’re flat out Doing It Wrong. We may well be, and that could be a problem we have to fix

I do think threading is going to be our best option, though it may be it’s all down at the C++ level (more on that later). And there are other experts who probably know better than I do. But I think we have to be able to write algorithms that can see more memory than is available on a single core (like a deblender running simultaneously on ~20 flavors of coadd), and threads are the most obvious way to do that.

I agree that we don’t want to use low-level parallelization for everything; where possible, I’ve tried to split things up some other way. But I don’t think it’s hard to add vectorization to simple operations like Image arithmetic, even if we don’t use it when we run pipelines that only have one core per image.

And I certainly agree that using low-level parallelization makes it harder to keep all the cores busy. I think we’ll be saved here by the fact that we only need to keep them all busy during the steps that take most of the time; I think that’s a much lower bar. I’ve also been told by some HPC experts that memory locality is frequently a big problem for codes with “large scale” parallelization; there isn’t enough cache to keep all the cores busy (this is why CSES wants us to use as few cores as possible on Orbital, I believe). Low-level parallelization that lets multiple threads use the same cache is one way to get around that.

I think I screwed up. I was thinking that the scatteredData is a special distributed data type (a la Spark), but it’s probably more like:

scatteredData = sc.parallelize(localData)
gatheredData = scatteredData.map(task.processScatter)
newScatterData = gatheredData.reduce(task.crunchGathered)

or (with Dataflow):

gatheredData = scatteredData | dataflow.Map('scatter', task.processScatter)
newScatterData = gatheredData | dataflow.GroupByKey('gather')
newScatterData | dataflow.Map('scatter2', task.processScatterAgain)

Hmm. Not sure I really like either of those. Maybe you’re right about hiding the ugliness behind decorators.

Maybe we differ over our understanding of what the orchestration layer is. I get the feeling that you’re seeing the orchestration layer as something like Slurm where you fire off jobs that carve out their own section of the cluster. That kind of static use of the cluster (this process and only this process will use these N cores and only these N cores) is the source of the inefficiencies I’ve been complaining about. It’s very similar to the problem where “using low-level parallelization makes it harder to keep all the cores busy”.

In my mind, the DAG solver and executor are the orchestration layer but the DAG itself should be part of the algorithmic layer, and written in python code rather than a list of shell commands. And the DAG should encompass not just one visit, but all visits. I think that we should be able to write the DAG for the entire pipeline as a single script and have it run as a single entity rather than a bunch of shell commands. That allows for more dynamic use of the cluster.

Here’s how I’d envisioned the orchestration layer working. I’m quite flexible on the details of how this would work, but hopefully it will still be useful to clarify things:

  • We define the full pipeline as a bunch of components and data products with dependencies using some sort of declarative Python syntax (I’m imagining something like SCons’ syntax for actions and targets). This doesn’t actually define a DAG (because we haven’t provided any data IDs), but it does define a complete configuration tree, and some sort of “DAG factory”. This happens in the codebase, not a rerun repository.

  • We create an output repository by feeding configuration overrides and the full set of data IDs we want to process (probably as a set of tracts and a visit blacklist) to the “DAG factory”. This defines the DAG to the extent we can before launching - at least some of of the details will have to be created on the fly (e.g. multifit jobs), but I think that can be hidden from the user.

  • We tell the orchestration layer either execute the full DAG on a cluster, and it handles all the load-balancing and the order data is processed (by talking to the data access middleware) from there.

  • If we want to do testing or debugging, we can also execute any subgraph of the DAG (specifying an endpoint data product, in the same way we would specify a target to a build system), and it should reuse any existing data products instead of regenerating them (with some way to opt not to do that, of course). Just running a bunch of these processes separately would result in poor load-balancing, so we wouldn’t do that for production.