Adding columns and concatenating afw tables


I would like to add new columns to an already existing afw table coming from the butler, as well as concatenate several of these tables in order to get one single table. What would be the most efficient way to do that?



This example will read a set of SourceCatalogs from a Butler, append some known int column from the data ID to each catalog (as an example), and combine all of the SourceCatalogs into a single one with contiguous memory. You could then use it to e.g. make plots or get an astropy.table view.

import lsst.afw.table

def process_catalogs(butler, data_ids, dataset, extra_col):
    # Load the schema used by all of the catalogs we're about to load
    schema = butler.get(dataset + "_schema", immediate=True).schema
    # Create a mapper that defines a new Schema with the additional
    # column and knows how to copy records from the old Schema.
    mapper = lsst.afw.table.SchemaMapper(schema)
    # Copy all of the existing fields from the old Schema (doesn't copy
    # the values now, but sets up the mapper to do it later).
    mapper.addMinimalSchema(schema, True)
    # Add another for the value from the data ID.
    extra_key = mapper.editOutputSchema().addField(
        extra_col, type=int,
        doc="extra column from data ID"
    # Make the full output catalog.
    full_catalog = lsst.afw.table.SourceCatalog(mapper.getOutputSchema())
    for data_id in data_ids:
        # If you don't want the Footprints, you can save a lot of time
        # and memory by not loading them.
        catalog = butler.get(
            dataset, dataId=data_id,
        # Make a temporary table with the new schema, using the existing
        # "table" object (think of it as a pool of memory for records)
        # from the full catalog for efficiency.
        tmp_catalog = lsst.afw.table.Catalog(full_catalog.table)
        # Copy in the old records, using the mapper.
        # We allocate space for the new records up front to ensure
        # all the records we are about to add will be contiguous in
        # memory.
        tmp_catalog.extend(catalog, mapper=mapper)
        # Fill in the new column.  Note the extra "[:]"; this gets a
        # NumPy view and sets it in place.  We should support
        # __setitem__ directly, but I don't think we do right
        # now for assigning scalars.
        tmp_catalog[extra_key][:] = data_id[extra_col]
        # Add all of the updated records to the concatenated catalog.
        # Note that we don't need to use the mapper here because both
        # catalogs already have the same schema.
    # In order to access columns of the full catalog, we need to do a
    # deep copy to make it contiguous.
    return full_catalog.copy(deep=True)

Note: this code has not been tested. Please report any bugs you find.

Note that Jim’s technique for concatenating tables here, while straightforward and easy to understand and use, is subject to memory fragmentation because it allocates a bunch of throwaway blocks and then a large block that is held. (This is a common problem in concatenating large amounts of memory, and not particular to the LSST code.) That may not be an issue if your input catalogs are small in number and size compared to the total available memory, but if you find yourself running out of memory (or you’re trying to write robust code) then you need to first loop through the inputs to sum the number of rows, allocate your output catalog, and then loop through the inputs again and assign them into the output catalog. Unfortunately, the butler currently doesn’t make it easy to get the length of a catalog (I’ll put in a feature request); thankfully the pipeline usually records the schema, or that would also make things difficult.

Here’s my suggested implementation, which I’ve tested on a ci_hsc run:

from lsst.daf.persistence import Butler
import lsst.afw.table
import lsst.afw.image

def concatenateCatalogs(butler, dataset, dataIdList):
    filenames = (butler.get(dataset + "_filename", dataId, immediate=True)[0] for dataId in dataIdList)
    headers = (lsst.afw.image.readMetadata(fn, 2) for fn in filenames)
    size = sum(md.get("NAXIS2") for md in headers)
    schema = butler.get(dataset + "_schema", immediate=True).schema
    mapper = lsst.afw.table.SchemaMapper(schema)
    mapper.addMinimalSchema(schema, True)
    newKey = mapper.editOutputSchema().addField("new_column", type=int, doc="new column")
    catalog = lsst.afw.table.SourceCatalog(mapper.getOutputSchema())
    index = 0
    for ii, dataId in enumerate(dataIdList):
        cat = butler.get(dataset, dataId, flags=lsst.afw.table.SOURCE_IO_NO_FOOTPRINTS, immediate=True)
        print "Reading catalog %d: %s" % (ii, dataId)
        catalog.extend(cat, mapper)
        catalog[newKey][index:] = ii
        index += len(cat)
    return catalog

def main():
    butler = Butler("DATA/rerun/ci_hsc")
    catalog = concatenateCatalogs(butler, "deepCoadd_meas", 
                                  [dict(tract=0, patch="5,4", filter="HSC-R"),
                                   dict(tract=0, patch="5,4", filter="HSC-I"),
    print len(catalog)
    print catalog["new_column"]
    print (catalog["new_column"] == 0).sum()
    print (catalog["new_column"] == 1).sum()

if __name__ == "__main__":

The output was:

$ python ~/ 
CameraMapper WARN: Calibration root directory not found: DATA/rerun/ci_hsc/CALIB
Reading catalog 0: {'filter': 'HSC-R', 'patch': '5,4', 'tract': 0}
Reading catalog 1: {'filter': 'HSC-I', 'patch': '5,4', 'tract': 0}
[0 0 0 ..., 1 1 1]

(No surprise the input catalogs are the same length: they’re from the multiband measurement stage, where we measure the same objects in each filter.)

After DM-8212 merges, you should be able to say:

size = sum(butler.get(dataset + "_len", dataId) for dataId in dataIdList)

and do away with the first two lines of my concatenateCatalogs function.

Thanks a lot. That works and saved me time and memory. For now I’m using the v12.0 version of the stack. I will use this new feature when I will have updated the stack to the next stable version.