Skip to content

Commit

Permalink
Merge pull request #244 from lsst/tickets/DM-46249
Browse files Browse the repository at this point in the history
DM-46249: Add SasquatchDatastore to enable uploading analysis_tools metrics
  • Loading branch information
BrunoSanchez authored Feb 19, 2025
2 parents 1a3ea5e + dd3d62d commit 763dcd8
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 16 deletions.
30 changes: 28 additions & 2 deletions bin/run_ci_dataset.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,57 @@ print_error() {

usage() {
print_error
print_error "Usage: $0 -d DATASET [-g NUM] [-p PATH] [-h]"
print_error "Usage: $0 -d DATASET [-g NUM] [-p PATH] [-n NAMESPACE] [-u URL] [-h]"
print_error
print_error "Specific options:"
print_error " -d Dataset name"
print_error " -g Middleware generation number (int) [currently unused]"
print_error " -p Pipeline to run"
print_error " -n Namespace for metrics upload (optional, but required if -u is set)"
print_error " -u URL for metrics upload (optional, but required if -n is set)"
print_error " -h show this message"
exit 1
}

while getopts "d:g:p:h" option; do
while getopts "d:g:p:n:u:h" option; do
case "$option" in
d) DATASET="$OPTARG";;
g) GEN="$OPTARG";;
p) PIPE="$OPTARG";;
n) NAMESPACE="$OPTARG";;
u) URL="$OPTARG";;
h) usage;;
*) usage;;
esac
done

if [[ -z "${DATASET}" ]]; then
print_error "$0: mandatory argument -- d"
usage
exit 1
fi

# Ensure both NAMESPACE and URL exist, or neither does
if { [[ -n "${NAMESPACE}" ]] && [[ -z "${URL}" ]]; } || { [[ -z "${NAMESPACE}" ]] && [[ -n "${URL}" ]]; }; then
print_error "Error: Both -n (namespace) and -u (URL) must be provided together, or neither."
usage
exit 1
fi

# Set PIPE argument if provided
if [[ -n "${PIPE}" ]]; then
PIPE="--pipeline ${PIPE}"
fi

# Set NAMESPACE and URL arguments if both are provided
if [[ -n "${NAMESPACE}" && -n "${URL}" ]]; then
NAMESPACE_ARG="--namespace ${NAMESPACE}"
URL_ARG="--restProxyUrl ${URL}"
else
NAMESPACE_ARG=""
URL_ARG=""
fi

shift $((OPTIND-1))

PRODUCT_DIR=${AP_VERIFY_DIR}
Expand Down Expand Up @@ -92,4 +116,6 @@ ap_verify.py --dataset "${DATASET}" \
${PIPE} \
--output "${WORKSPACE}" \
--processes "${NUMPROC}" \
${NAMESPACE_ARG} \
${URL_ARG} \
&>> "${WORKSPACE}"/apVerify.log
13 changes: 11 additions & 2 deletions doc/lsst.ap.verify/command-line-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Required arguments are :option:`--dataset` and :option:`--output`.
Multiple copies of this argument are allowed.

If this argument is omitted, then all data IDs in the dataset will be processed.

.. option:: --dataset <dataset_package>

**Input dataset package.**
Expand All @@ -87,6 +87,15 @@ Required arguments are :option:`--dataset` and :option:`--output`.

If this argument is omitted, ``ap_verify`` creates an SQLite database inside the directory indicated by :option:`--output`.

.. option:: --namespace <sasquatch_namespace>

The sasquastch namespace to use for the ap_verify metrics upload.
If this is provided, then a valid REST proxy URL must be provided with :option:`--restProxyUrl`.

.. option:: --restProxyUrl <sasquastch_proxy_url>

A URI string identifying the Sasquastch url to use for the ap_verify metrics upload. If this is provided, then a valid :option:`--namespace` must be provided.

.. option:: -h, --help

**Print help.**
Expand All @@ -98,7 +107,7 @@ Required arguments are :option:`--dataset` and :option:`--output`.
**Number of processes to use.**

When ``processes`` is larger than 1 the pipeline may use the Python `multiprocessing` module to parallelize processing of multiple datasets across multiple processors.

.. option:: --output <workspace_dir>

**Output and intermediate product path.**
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/ap/verify/ap_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import lsst.log

from .dataset import Dataset
from .ingestion import ingestDatasetGen3
from .ingestion import ingestDatasetGen3, IngestionParser
from .pipeline_driver import ApPipeParser, runApPipeGen3
from .workspace import WorkspaceGen3

Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self):
self,
description='Executes the LSST DM AP pipeline and analyzes its performance using metrics.',
epilog='',
parents=[_InputOutputParser(), _ProcessingParser(), ApPipeParser(), ],
parents=[IngestionParser(), _InputOutputParser(), _ProcessingParser(), ApPipeParser(), ],
add_help=True)


Expand All @@ -109,7 +109,7 @@ def __init__(self):
'passing the same --output argument, or by other programs that accept '
'Butler repositories as input.',
epilog='',
parents=[_InputOutputParser(), _ProcessingParser()],
parents=[IngestionParser(), _InputOutputParser(), _ProcessingParser()],
add_help=True)


Expand Down Expand Up @@ -150,7 +150,7 @@ def runApVerify(cmdLine=None):
log.debug('Command-line arguments: %s', args)

workspace = WorkspaceGen3(args.output)
ingestDatasetGen3(args.dataset, workspace, processes=args.processes)
ingestDatasetGen3(args.dataset, workspace, args.namespace, args.restProxyUrl, processes=args.processes)
log.info('Running pipeline...')
# Gen 3 pipeline includes both AP and metrics
return runApPipeGen3(workspace, args, processes=args.processes)
Expand Down
21 changes: 18 additions & 3 deletions python/lsst/ap/verify/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __repr__(self):
"""
return f"Dataset({self._id!r})"

def makeCompatibleRepoGen3(self, repoDir):
def makeCompatibleRepoGen3(self, repoDir, sasquatchNamespace, sasquatchRestProxyUrl):
"""Set up a directory as a Gen 3 repository compatible with this ap_verify dataset.
If the repository already exists, this call has no effect.
Expand All @@ -170,10 +170,25 @@ def makeCompatibleRepoGen3(self, repoDir):
seedConfig = dafButler.Config()
# Checksums greatly slow importing of large repositories
seedConfig["datastore", "checksum"] = False
transfMode = "auto"
if sasquatchRestProxyUrl is not None:
seedConfig[
"datastore", "cls"] = "lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore"

datastores = [
{"cls": "lsst.daf.butler.datastores.fileDatastore.FileDatastore",
"root": "<butlerRoot>",
},
{"cls": "lsst.analysis.tools.interfaces.datastore.SasquatchDatastore",
"restProxyUrl": sasquatchRestProxyUrl,
"namespace": sasquatchNamespace,
},
]
seedConfig["datastore", "datastores"] = datastores
transfMode = "direct"
repoConfig = dafButler.Butler.makeRepo(repoDir, config=seedConfig)
butler = dafButler.Butler(repoConfig, writeable=True)
butler.import_(directory=self._preloadedRepo, filename=self._preloadedExport,
transfer="auto")
butler.import_(directory=self._preloadedRepo, filename=self._preloadedExport, transfer=transfMode)
except FileExistsError:
pass

Expand Down
45 changes: 40 additions & 5 deletions python/lsst/ap/verify/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

__all__ = ["Gen3DatasetIngestConfig", "ingestDatasetGen3"]

import argparse
import fnmatch
import os
import shutil
Expand All @@ -45,6 +46,24 @@
_LOG = logging.getLogger(__name__)


class IngestionParser(argparse.ArgumentParser):
"""An argument parser for data needed by ingestion.
This parser is not complete, and is designed to be passed to another parser
using the `parent` parameter.
"""

def __init__(self):
# Help and documentation will be handled by main program's parser
argparse.ArgumentParser.__init__(self, add_help=False)

self.add_argument('--namespace', dest='namespace', default=None,
help='The sasquastch namespace to use for the ap_verify metrics upload.')

self.add_argument('--restProxyUrl', dest='restProxyUrl', default=None,
help='The sasquastch url to use for the ap_verify metrics upload.')


class Gen3DatasetIngestConfig(pexConfig.Config):
"""Settings and defaults for `Gen3DatasetIngestTask`.
Expand Down Expand Up @@ -74,6 +93,10 @@ class Gen3DatasetIngestConfig(pexConfig.Config):
"supersedes ``dataFiles``.",
)

def setDefaults(self):
super().setDefaults()
self.ingester.transfer = "copy"


class Gen3DatasetIngestTask(pipeBase.Task):
"""Task for automating ingestion of a ap_verify dataset.
Expand All @@ -95,18 +118,23 @@ class Gen3DatasetIngestTask(pipeBase.Task):
# Suffix is de-facto convention for distinguishing Gen 2 and Gen 3 config overrides
_DefaultName = "datasetIngest-gen3"

def __init__(self, dataset, workspace, *args, **kwargs):
def __init__(self, dataset, workspace, namespace, url, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workspace = workspace
self.dataset = dataset
self.namespace = namespace
self.url = url
# workspace.workButler is undefined until the repository is created
self.dataset.makeCompatibleRepoGen3(self.workspace.repo)
self.dataset.makeCompatibleRepoGen3(self.workspace.repo, self.namespace, self.url)
if self.url is not None:
self.transferMode = "copy"
self.makeSubtask("ingester", butler=self.workspace.workButler)
self.makeSubtask("visitDefiner", butler=self.workspace.workButler)

def _reduce_kwargs(self):
# Add extra parameters to pickle
return dict(**super()._reduce_kwargs(), dataset=self.dataset, workspace=self.workspace)
return dict(**super()._reduce_kwargs(), dataset=self.dataset,
workspace=self.workspace, namespace=self.namespace, url=self.url)

def run(self, processes=1):
"""Ingest the contents of a dataset into a Butler repository.
Expand Down Expand Up @@ -242,7 +270,7 @@ def _copyConfigs(self):
self.log.info("Configs are now stored in %s.", self.workspace.pipelineDir)


def ingestDatasetGen3(dataset, workspace, processes=1):
def ingestDatasetGen3(dataset, workspace, sasquatchNamespace, sasquatchUrl, processes=1):
"""Ingest the contents of an ap_verify dataset into a Gen 3 Butler repository.
The original data directory is not modified.
Expand All @@ -254,12 +282,19 @@ def ingestDatasetGen3(dataset, workspace, processes=1):
workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
The abstract location where the epository is be created, if it does
not already exist.
sasquatchNamespace : `str`
The name of the namespace to post the ap_verify metrics to.
sasquatchUrl : `str`
The URL of the server to post the ap_verify metrics to.
processes : `int`
The number processes to use to ingest.
"""
log = _LOG.getChild("ingestDataset")

ingester = Gen3DatasetIngestTask(dataset, workspace, config=_getConfig(Gen3DatasetIngestTask, dataset))
ingester = Gen3DatasetIngestTask(
dataset, workspace, sasquatchNamespace, sasquatchUrl,
config=_getConfig(Gen3DatasetIngestTask, dataset)
)
ingester.run(processes=processes)
log.info("Data ingested")

Expand Down

0 comments on commit 763dcd8

Please sign in to comment.