Skip to content

DM-47970: Make raw deletion more thorough. #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ def process_visit(expected_visit: FannedOutVisit):
_get_local_repo().name,
_get_local_cache())
# TODO: pipeline execution requires a clean run until DM-38041.
cleanups.callback(mwi.clean_local_repo, expid_set)
cleanups.callback(mwi.clean_local_repo)
# Copy calibrations for this detector/visit
mwi.prep_butler()

Expand Down
27 changes: 11 additions & 16 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1720,28 +1720,23 @@ def _query_datasets_by_storage_class(self, butler, exposure_ids, collections, st
) for t in matching_types
)

def clean_local_repo(self, exposure_ids: set[int]) -> None:
def clean_local_repo(self) -> None:
"""Remove local repo content that is only needed for a single visit.

This includes raws and pipeline outputs.

Parameter
---------
exposure_ids : `set` [`int`]
Identifiers of the exposures to be removed.
"""
with lsst.utils.timer.time_this(_log, msg="clean_local_repo", level=logging.DEBUG):
self.butler.registry.refresh()
if exposure_ids:
raws = self.butler.query_datasets(
'raw',
collections=self.instrument.makeDefaultRawIngestRunName(),
where=f"exposure in ({', '.join(str(x) for x in exposure_ids)})",
explain=False, # Raws might not have been ingested.
instrument=self.visit.instrument,
detector=self.visit.detector,
)
self.butler.pruneDatasets(raws, disassociate=True, unstore=True, purge=True)
# Clean out raws
raws = self.butler.query_datasets(
'raw',
collections=self.instrument.makeDefaultRawIngestRunName(),
explain=False, # Raws might not have been ingested.
instrument=self.visit.instrument,
detector=self.visit.detector,
)
self.butler.pruneDatasets(raws, disassociate=True, unstore=True, purge=True)

# Outputs are all in their own runs, so just drop them.
preload_run = runs.get_preload_run(self.instrument, self._deployment, self._day_obs)
_remove_run_completely(self.butler, preload_run)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ def test_clean_local_repo(self):
self._assert_in_collection(butler, "*", "bias", calib_data_id_2)
self._assert_in_collection(butler, "*", "bias", calib_data_id_3)

self.interface.clean_local_repo({raw_data_id["exposure"]})
self.interface.clean_local_repo()
self._assert_not_in_collection(butler, "*", "raw", raw_data_id)
self._assert_not_in_collection(butler, "*", "src", processed_data_id)
self._assert_not_in_collection(butler, "*", "calexp", processed_data_id)
Expand Down