How to handle connection abortion for job.fetch_result().to_table()

Hello!

I am trying to detect microlensing signal in DP1 light curves. I have set up my code to read all the IDs first (all those that have at least 10 positive flux values in r), then I read lightcurves in batches of 500, and apply my detection algorithm to the 500 in parallel in all the available cores.
When running the code, after a couple of batches of 500 light curves, I get a connection abortion error as follows. Do you possibly have some suggestions to fix this issue?

Traceback (most recent call last):
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/connectionpool.py”, line 787, in urlopen
response = self._make_request(
^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/connectionpool.py”, line 534, in _make_request
response = conn.getresponse()
^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/connection.py”, line 565, in getresponse
httplib_response = super().getresponse()
^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/http/client.py”, line 1430, in getresponse
response.begin()
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/http/client.py”, line 331, in begin
version, status, reason = self._read_status()
^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/http/client.py”, line 300, in _read_status
raise RemoteDisconnected(“Remote end closed connection without”
http.client.RemoteDisconnected: Remote end closed connection without response

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/requests/adapters.py”, line 667, in send
resp = conn.urlopen(
^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/connectionpool.py”, line 841, in urlopen
retries = retries.increment(
^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/util/retry.py”, line 474, in increment
raise reraise(type(error), error, _stacktrace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/util/util.py”, line 38, in reraise
raise value.with_traceback(tb)
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/connectionpool.py”, line 787, in urlopen
response = self._make_request(
^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/connectionpool.py”, line 534, in _make_request
response = conn.getresponse()
^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/urllib3/connection.py”, line 565, in getresponse
httplib_response = super().getresponse()
^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/http/client.py”, line 1430, in getresponse
response.begin()
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/http/client.py”, line 331, in begin
version, status, reason = self._read_status()
^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/http/client.py”, line 300, in _read_status
raise RemoteDisconnected(“Remote end closed connection without”
urllib3.exceptions.ProtocolError: (‘Connection aborted.’, RemoteDisconnected(‘Remote end closed connection without response’))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/pyvo/dal/tap.py”, line 1032, in fetch_result
response = self._session.get(self.result_uri, stream=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/pyvo/auth/authsession.py”, line 64, in get
return self._request(‘GET’, url, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/pyvo/auth/authsession.py”, line 108, in _request
return session.request(http_method, url, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/requests/sessions.py”, line 589, in request
resp = self.send(prep, **send_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/requests/sessions.py”, line 724, in send
history = [resp for resp in gen]
^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/requests/sessions.py”, line 265, in resolve_redirects
resp = self.send(
^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/requests/sessions.py”, line 703, in send
r = adapter.send(request, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/requests/adapters.py”, line 682, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: (‘Connection aborted.’, RemoteDisconnected(‘Remote end closed connection without response’))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “”, line 198, in _run_module_as_main
File “”, line 88, in _run_code
File “/home/somayeh91/microlensing/rsp_tools/mulens_detector_V2.py”, line 174, in
run(args)
File “/home/somayeh91/microlensing/rsp_tools/mulens_detector_V2.py”, line 121, in run
forced_sources = job.fetch_result().to_table()
^^^^^^^^^^^^^^^^^^
File “/opt/lsst/software/stack/conda/envs/lsst-scipipe-10.0.0/lib/python3.12/site-packages/pyvo/dal/tap.py”, line 1038, in fetch_result
raise DALServiceError.from_except(ex, self.url)
pyvo.dal.exceptions.DALServiceError: (‘Connection aborted.’, RemoteDisconnected(‘Remote end closed connection without response’))

You are saying that the first few connections work and then it fails? You are running from an RSP terminal?

Can you send the few lines of code that you are using to do the query? (an error message alone without any context of how you are setting up the job makes it hard to follow.

Sure!

I am running through the terminal, and it fails after two batches. Here is the code:

def chunked(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]

def process_light_curve(dict_lc):
ID = list(dict_lc.keys())[0]
time = dict_lc[ID][‘time’]
flux = dict_lc[ID][‘flux’]
flux_err = dict_lc[ID][‘flux_err’]
delta_chi_squared_kmt, (t0, t_eff, f1, f0) = run_kmtnet_fit(time, flux, flux_err)
if delta_chi_squared_kmt > 0.9:
return f"{ID} {dict_lc[ID][‘ra_coor’]} {dict_lc[ID][‘dec_coor’]} {delta_chi_squared_kmt} {t0} {t_eff} {f1} {f0}\n"
else:
return None

def run(args):

# Set up the data butler
service = get_tap_service("tap")
assert service is not None
butler = Butler('dp1', collections="LSSTComCam/DP1")
assert butler is not None


# Query for data in region
ra_cen = args.ra
dec_cen = args.dec
radius = args.radius
region = sphgeom.Region.from_ivoa_pos(f"CIRCLE {ra_cen} {dec_cen} {radius}")

# Fetch a table of all Objects within this search radius
query = f"""
            SELECT
              fsodo.diaObjectId,
              fsodo.coord_ra,
              fsodo.coord_dec,
              COUNT(*) AS positive_flux_count
            FROM dp1.ForcedSourceOnDiaObject AS fsodo
            JOIN dp1.Visit AS vis ON fsodo.visit = vis.visit
            WHERE
              fsodo.band = 'r'
              AND fsodo.psfFlux > 0
              AND CONTAINS(
                POINT('ICRS', fsodo.coord_ra, fsodo.coord_dec),
                CIRCLE('ICRS', {ra_cen}, {dec_cen}, {radius})
              ) = 1
            GROUP BY
              fsodo.diaObjectId,
              fsodo.coord_ra,
              fsodo.coord_dec
            HAVING
              COUNT(*) > 10
        """

job = service.submit_job(query)
job.run()
job.wait(phases=['COMPLETED', 'ERROR'])
print('Job phase is', job.phase)
if job.phase == 'ERROR':
    job.raise_if_error()
assert job.phase == 'COMPLETED'
objtab = job.fetch_result().to_table()



# multiprocessing.set_start_method('spawn')
num_cores = multiprocessing.cpu_count()
print('Number of cores are:', num_cores)
n_chuncks = int(len(np.array(objtab['diaObjectId'].data))/500)+1

with open('./mulens_candidates.dat', 'w') as fout:
    fout.write('# diaObjectId   coord_ra  coord_dec  delta_chi_squared_kmt  t0  t_eff  f1  f0\n')

    chunk_counter = 0
    IDs = np.array(objtab['diaObjectId'].data)
    for chunk in chunked(IDs, 500):
        print('Starting chunck %i/%i:'%(chunk_counter, n_chuncks))
        ids_str = ",".join(str(i) for i in chunk)  
        
        
        query = f"""
            SELECT fsodo.diaObjectId, fsodo.coord_ra, fsodo.coord_dec,
                   fsodo.visit, fsodo.band,
                   fsodo.psfDiffFlux, fsodo.psfDiffFluxErr,
                   fsodo.psfFlux AS psfFlux, fsodo.psfFluxErr,
                   vis.expMidptMJD
            FROM dp1.ForcedSourceOnDiaObject AS fsodo
            JOIN dp1.Visit AS vis ON vis.visit = fsodo.visit
            WHERE fsodo.diaObjectId IN ({ids_str})
        """

        job = service.submit_job(query)
        job.run()
        job.wait(phases=['COMPLETED', 'ERROR'])

        
        if job.phase == 'ERROR':
            job.raise_if_error()
        assert job.phase == 'COMPLETED'

        forced_sources = job.fetch_result().to_table()
        all_data = forced_sources[forced_sources['band']=='r']
        all_data_list = []
        for ID in ids_str.split(','):
            target = objtab[objtab['diaObjectId']== int(ID)]
            all_data_list.append(
                {ID: {'ra_coor':target['coord_ra'],
                      'dec_coor':target['coord_dec'],
                     'time': np.array(all_data[all_data['diaObjectId'] == int(ID)]['expMidptMJD'].data),
                     'flux': np.array(all_data[all_data['diaObjectId'] == int(ID)]['psfFlux'].data),
                     'flux_err': np.array(all_data[all_data['diaObjectId'] == int(ID)]['psfFluxErr'].data)
                      }}
            )

        
        counter = 0
        while len(all_data_list) > 0:
            print('Step: ', counter)
            num_to_process = min(num_cores, len(all_data_list) )
            
            batch = all_data_list[:num_to_process]
            
            # Remove the processed objects from the list
            all_data_list = all_data_list[num_to_process:]
            
            pool = multiprocessing.Pool(processes=num_to_process)
                
            results = pool.map(process_light_curve, batch)
            pool.close()
            pool.join()

            for res in results:
                if res is not None:
                    fout.write(res)

            counter += 1

        print('Finished Chunck %i/%i.'%(chunk_counter, n_chuncks))
        chunk_counter +=1

This is going to cause you a lot of problems because the OS is reporting 32 but you only have access to ~ 4 CPUs. You need to base your multi processing on the allocated number of cores which you can get from CPU_LIMIT env var.

Specifically: if you are using the RSP Notebook aspect you have access to 4 CPUs

This is what this screen is showing you:

So running more than 4 jobs in parallel just makes everything worse for you, and the more likely explanation is that some of your jobs are starving out your other jobs.

That’s true, thanks for catching that! I will fix it and let it run again.