Debugging during multiband process

I’m running the multiband on the emulated images, but I need to speed things up to do some debugging on the emulation part of the code.

Is there a way to ask the tasks to compute the photometry on a reduced number of sources?

Back in the days, Steve Bickerton handed me a source code to retarget the “detection” part so that it would only run the photometry measurement on a reduced number of sources (see below), which is obviously broken given the significant changes since then.

What (and how) should I retarget now to do similar things?

import random
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.afw.table as afwTable
import lsst.meas.algorithms as measAlg

class FewerSourceDetectionConfig(measAlg.SourceDetectionConfig):
    nObjects = pexConfig.Field(doc="Number of sources to select", dtype=int, optional=False, default=200)

class FewerSourceDetectionTask(measAlg.SourceDetectionTask):
    """This task serves only to cull the source list and make measurement faster"""

    ConfigClass = FewerSourceDetectionConfig

    def makeSourceCatalog(self, table, exposure, doSmooth=True, sigma=None, clearMask=True):
        if self.negativeFlagKey is not None and self.negativeFlagKey not in table.getSchema():
            raise ValueError("Table has incorrect Schema")

        # detect the footprints as usual
        fpSets = self.detectFootprints(exposure=exposure, doSmooth=doSmooth, sigma=sigma,
                                       clearMask=clearMask)

        # shuffle the footprints to ensure they're random across the frame
        n = self.config.nObjects
        fpPos = fpSets.positive.getFootprints()
        random.shuffle(fpPos)

        # delete the excess footprints, and the negative footprints
        del fpPos[n:]
        fpSets.numPos = n
        if fpSets.negative:
            del fpSets.negative.getFootprints()[0:]
            fpSets.negative = None

        # make sources
        sources = afwTable.SourceCatalog(table)
        table.preallocate(fpSets.numPos)
        if fpSets.positive:
            fpSets.positive.makeSources(sources)

        return pipeBase.Struct(sources=sources, fpSets=fpSets)

It looks like a good approach, but I expect you’re hitting some problems when calling the retargeted code. You should be able to work around it quite easily by simply passing the new arguments along:

class FewerSourceDetectionTask(measAlg.SourceDetectionTask):
    ...
    def makeSourceCatalog(self, table, exposure, *args, **kwargs):
        ...
        fpSets = self.detectFootprints(exposure=exposure, *args, **kwargs)
        ...

ideally I’d like to cull sources at the merge step, but applying a similar algorithm as above to the merge task, I’m being said:

type object 'MergeDetectionsTask' has no attribute 'retarget'

not all tasks can be retargeted?

How do I know what ones can?

Can you give me instructions to reproduce the problem?

Command run:

    mergeCoaddDetections.py $ROOTDIR --rerun=multiband \
      --id tract=8524 patch=5,5 filter=HSC-R^HSC-I  \
      --clobber-config --no-versions \
      -C mergeCoaddDetections_debug.py

mergeCoaddDetections_debug.py:

import sys
sys.path.insert(0, "./python")
import mergeFewer
config.mergeCoaddDetections.retarget(mergeFewer.FewerMergeDetectionsTask)

mergeFewer.py:

import lsst.pex.config as pexConfig

import lsst.pipe.tasks as pipeTask

class FewerMergeDetectionsConfig(pipeTask.multiBand.MergeDetectionsConfig):
    nObjects = pexConfig.Field(doc="Number of sources to select", dtype=int, optional=False, default=200)

class FewerMergeDetectionsTask(pipeTask.multiBand.MergeDetectionsTask):
    """This task serves only to cull the source list and make measurement faster"""

    ConfigClass = FewerMergeDetectionsConfig

    def mergeCatalogs(self, catalogs, patchRef):
        """!
        \brief Merge multiple catalogs.

        After ordering the catalogs and filters in priority order,
        \ref getMergedSourceCatalog of the \ref FootprintMergeList_ "FootprintMergeList" created by
        \ref \_\_init\_\_ is used to perform the actual merging. Finally, \ref cullPeaks is used to remove
        garbage peaks detected around bright objects.

        \param[in]  catalogs
        \param[in]  patchRef
        \param[out] mergedList
        """

        # Convert distance to tract coordinate
        skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
        tractWcs = skyInfo.wcs
        peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
        samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()

        # Put catalogs, filters in priority order
        orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
        orderedBands = [getShortFilterName(band) for band in self.config.priorityList
                        if band in catalogs.keys()]

        mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
                                                        self.schema, self.makeIdFactory(patchRef),
                                                        samePeakDistance)

        #
        # Add extra sources that correspond to blank sky
        #
        skySeed = patchRef.get(self.config.coaddName + "MergedCoaddId")
        skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
        if skySourceFootprints:
            key = mergedList.schema.find("merge_footprint_%s" % self.config.skyFilterName).key
            for foot in skySourceFootprints:
                s = mergedList.addNew()
                s.setFootprint(foot)
                s.set(key, True)

        # DEBUGGING_JC
        # numpy.random.seed(seed=20091982)
        n = 100
        self.log.info("DEBUGGING: Keep {} sources".format(n))
        # mergedList = numpy.random.shuffle(mergedList)
        mergedList = mergedList[:n]



        # Sort Peaks from brightest to faintest
        for record in mergedList:
            record.getFootprint().sortPeaks()
        self.log.info("Merged to %d sources" % len(mergedList))
        # Attempt to remove garbage peaks
        self.cullPeaks(mergedList)
        return mergedList

When I run mergeCoaddDetections.py like that, I get:

mergeCoaddDetections.py: error: cannot load config file 'mergeConfig.py': 'MergeDetectionsConfig' object has no attribute 'mergeCoaddDetections'

Which makes sense: there’s no mergeCoaddDetections field in the MergeDetectionsConfig. Another way of saying that is that you can’t override the top-level Task.

Did you mean to run this under multibandDriver.py?

right, sorry it should have been:

from lsst.pipe.tasks.multiBand import MergeDetectionsTask
MergeDetectionsTask.retarget(mergeFewer.FewerMergeDetectionsTask)

That’s your problem right there. You can’t retarget a Task. You can retarget a ConfigurableField in the config:

config.mergeDetections.retarget(mergeFewer.FewerMergeDetectionsTask)

So - if I understand correctly - I would need

  1. to create a parent task that would call mergeCoaddDetection as a ConfigurableField
  2. or apply the redirection when calling multibandDriver.py

For 2) it seems to work (just need to fix your previous command to (config.mergeCoaddDetections.retarget(mergeFewer.FewerMergeDetectionsTask) but I get this error which I think means that I need to load some additional stuff in the FewerMergeDetections script:

KeyError: "Field with name 'merge_peak_i2' not found"

it seems it’s not so simple in the end: the full error tree below suggests some schema is missing. Looking at multibandDriver.py, it looks like it has to do with the schema being loaded during the init.

Any idea?

  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/pipe_drivers/6.1.1-hsc+7/bin/multiBandDriver.py", line 3, in <module>
    MultiBandDriverTask.parseAndSubmit()
  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/ctrl_pool/6.0-hsc+2/python/lsst/ctrl/pool/parallel.py", line 442, in parseAndSubmit
    if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent):  # Write config, schema
  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/pipe_base/6.0-hsc+2/python/lsst/pipe/base/cmdLineTask.py", line 347, in precall
    task = self.makeTask(parsedCmd=parsedCmd)
  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/pipe_drivers/6.1.1-hsc+7/python/lsst/pipe/drivers/multiBandDriver.py", line 131, in makeTask
    return self.TaskClass(config=self.config, log=self.log, butler=butler, reuse=self.reuse)
  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/pipe_drivers/6.1.1-hsc+7/python/lsst/pipe/drivers/multiBandDriver.py", line 168, in __init__
    self.measureCoaddSources.schema))
  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/pipe_base/6.0-hsc+2/python/lsst/pipe/base/task.py", line 299, in makeSubtask
    subtask = taskField.apply(name=name, parentTask=self, **keyArgs)
  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/pex_config/6.0b1-hsc+9/python/lsst/pex/config/configurableField.py", line 83, in apply
    return self.target(*args, config=self.value, **kw)
  File "/Users/coupon/local/source/hscPipe6/stack/miniconda3-4.3.21-10a4fa6/DarwinX86/pipe_tasks/6.5.1-hsc+2/python/lsst/pipe/tasks/multiBand.py", line 1304, in __init__
    peakKey = inputSchema.find("merge_peak_%s" % short).key
KeyError: "Field with name 'merge_peak_i2' not found"

When you retarget, the configuration overrides already applied are lost, so you need to explicitly load them back. Add to your mergeCoaddDetections_debug.py after the retarget:

import os
obsSubaru = os.environ["OBS_SUBARU_DIR"]
overrides = [os.path.join(obsSubaru, "config", "mergeCoaddDetections.py"),
             os.path.join(obsSubaru, "config", "hsc", "mergeCoaddDetections.py"),
             ]
for filename in overrides:
    if os.path.exists(filename):
        config.mergeCoaddDetections.load(filename)

Fantastic ;). It works.

Below are the corrected codes:

mergeCoaddDetections_debug.py:

import sys
sys.path.insert(0, "./python")

import mergeFewer

config.mergeCoaddDetections.retarget(mergeFewer.FewerMergeDetectionsTask)

import os
obsSubaru = os.environ["OBS_SUBARU_DIR"]
overrides = [os.path.join(obsSubaru, "config", "mergeCoaddDetections.py"),
             os.path.join(obsSubaru, "config", "hsc", "mergeCoaddDetections.py"),
             ]
for filename in overrides:
    if os.path.exists(filename):
        config.mergeCoaddDetections.load(filename)

mergeFewer.py:


import lsst.pex.config as pexConfig
import lsst.pipe.tasks as pipeTask
from lsst.pipe.tasks.coaddBase import getSkyInfo
from lsst.pipe.tasks.multiBand import getShortFilterName

class FewerMergeDetectionsConfig(pipeTask.multiBand.MergeDetectionsConfig):
    nObjects = pexConfig.Field(
        doc="Number of sources to select",
        dtype=int, optional=False, default=100)

class FewerMergeDetectionsTask(pipeTask.multiBand.MergeDetectionsTask):
    """This task serves only to cull the source list and make measurement faster"""

    _DefaultName = "FewerMergeCoaddMeasurements"
    ConfigClass = FewerMergeDetectionsConfig

    def mergeCatalogs(self, catalogs, patchRef):
        """!
        \brief Merge multiple catalogs.

        After ordering the catalogs and filters in priority order,
        \ref getMergedSourceCatalog of the \ref FootprintMergeList_ "FootprintMergeList" created by
        \ref \_\_init\_\_ is used to perform the actual merging. Finally, \ref cullPeaks is used to remove
        garbage peaks detected around bright objects.

        \param[in]  catalogs
        \param[in]  patchRef
        \param[out] mergedList
        """

        # print("test")

        # Convert distance to tract coordinate
        skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
        tractWcs = skyInfo.wcs
        peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
        samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()

        # Put catalogs, filters in priority order
        orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
        orderedBands = [getShortFilterName(band) for band in self.config.priorityList
                        if band in catalogs.keys()]

        mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
                                                        self.schema, self.makeIdFactory(patchRef),
                                                        samePeakDistance)

        #
        # Add extra sources that correspond to blank sky
        #
        skySeed = patchRef.get(self.config.coaddName + "MergedCoaddId")
        skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
        if skySourceFootprints:
            key = mergedList.schema.find("merge_footprint_%s" % self.config.skyFilterName).key
            for foot in skySourceFootprints:
                s = mergedList.addNew()
                s.setFootprint(foot)
                s.set(key, True)

        # pick only the first 100 merged sources
        self.log.info("DEBUGGING: Keep {} sources".format(self.config.nObjects))
        mergedList = mergedList[:self.config.nObjects]

        # Sort Peaks from brightest to faintest
        for record in mergedList:
            record.getFootprint().sortPeaks()
        self.log.info("Merged to %d sources" % len(mergedList))
        # Attempt to remove garbage peaks
        self.cullPeaks(mergedList)
        return mergedList

The command line remains the same:

  multiBandDriver.py $ROOTDIR --rerun=multiband \
    --id tract=8524 patch=5,5 filter=HSC-R^HSC-I  \
    --clobber-config --no-versions \
    --nodes 1 --procs 1 --do-exec \
    --config measureCoaddSources.doPropagateFlags=False \
    -C mergeCoaddDetections_debug.py


1 Like

I think the command-line is multibandDriver.py, right?

you’re right. Previous post fixed.