Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-46249: Add SasquatchDatastore to enable uploading analysis_tools metrics #244

Merged
merged 2 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions bin/run_ci_dataset.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,57 @@ print_error() {

usage() {
print_error
print_error "Usage: $0 -d DATASET [-g NUM] [-p PATH] [-h]"
print_error "Usage: $0 -d DATASET [-g NUM] [-p PATH] [-n NAMESPACE] [-u URL] [-h]"
print_error
print_error "Specific options:"
print_error " -d Dataset name"
print_error " -g Middleware generation number (int) [currently unused]"
print_error " -p Pipeline to run"
print_error " -n Namespace for metrics upload (optional, but required if -u is set)"
print_error " -u URL for metrics upload (optional, but required if -n is set)"
print_error " -h show this message"
exit 1
}

while getopts "d:g:p:h" option; do
while getopts "d:g:p:n:u:h" option; do
case "$option" in
d) DATASET="$OPTARG";;
g) GEN="$OPTARG";;
p) PIPE="$OPTARG";;
n) NAMESPACE="$OPTARG";;
u) URL="$OPTARG";;
h) usage;;
*) usage;;
esac
done

if [[ -z "${DATASET}" ]]; then
print_error "$0: mandatory argument -- d"
usage
exit 1
fi

# Ensure both NAMESPACE and URL exist, or neither does
if { [[ -n "${NAMESPACE}" ]] && [[ -z "${URL}" ]]; } || { [[ -z "${NAMESPACE}" ]] && [[ -n "${URL}" ]]; }; then
print_error "Error: Both -n (namespace) and -u (URL) must be provided together, or neither."
usage
exit 1
fi

# Set PIPE argument if provided
if [[ -n "${PIPE}" ]]; then
PIPE="--pipeline ${PIPE}"
fi

# Set NAMESPACE and URL arguments if both are provided
if [[ -n "${NAMESPACE}" && -n "${URL}" ]]; then
NAMESPACE_ARG="--namespace ${NAMESPACE}"
URL_ARG="--restProxyUrl ${URL}"
else
NAMESPACE_ARG=""
URL_ARG=""
fi

shift $((OPTIND-1))

PRODUCT_DIR=${AP_VERIFY_DIR}
Expand Down Expand Up @@ -92,4 +116,6 @@ ap_verify.py --dataset "${DATASET}" \
${PIPE} \
--output "${WORKSPACE}" \
--processes "${NUMPROC}" \
${NAMESPACE_ARG} \
${URL_ARG} \
&>> "${WORKSPACE}"/apVerify.log
13 changes: 11 additions & 2 deletions doc/lsst.ap.verify/command-line-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Required arguments are :option:`--dataset` and :option:`--output`.
Multiple copies of this argument are allowed.

If this argument is omitted, then all data IDs in the dataset will be processed.

.. option:: --dataset <dataset_package>

**Input dataset package.**
Expand All @@ -87,6 +87,15 @@ Required arguments are :option:`--dataset` and :option:`--output`.

If this argument is omitted, ``ap_verify`` creates an SQLite database inside the directory indicated by :option:`--output`.

.. option:: --namespace <sasquatch_namespace>

The sasquastch namespace to use for the ap_verify metrics upload.
If this is provided, then a valid REST proxy URL must be provided with :option:`--restProxyUrl`.

.. option:: --restProxyUrl <sasquastch_proxy_url>

A URI string identifying the Sasquastch url to use for the ap_verify metrics upload. If this is provided, then a valid :option:`--namespace` must be provided.

.. option:: -h, --help

**Print help.**
Expand All @@ -98,7 +107,7 @@ Required arguments are :option:`--dataset` and :option:`--output`.
**Number of processes to use.**

When ``processes`` is larger than 1 the pipeline may use the Python `multiprocessing` module to parallelize processing of multiple datasets across multiple processors.

.. option:: --output <workspace_dir>

**Output and intermediate product path.**
Expand Down
8 changes: 4 additions & 4 deletions python/lsst/ap/verify/ap_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import lsst.log

from .dataset import Dataset
from .ingestion import ingestDatasetGen3
from .ingestion import ingestDatasetGen3, IngestionParser
from .pipeline_driver import ApPipeParser, runApPipeGen3
from .workspace import WorkspaceGen3

Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self):
self,
description='Executes the LSST DM AP pipeline and analyzes its performance using metrics.',
epilog='',
parents=[_InputOutputParser(), _ProcessingParser(), ApPipeParser(), ],
parents=[IngestionParser(), _InputOutputParser(), _ProcessingParser(), ApPipeParser(), ],
add_help=True)


Expand All @@ -109,7 +109,7 @@ def __init__(self):
'passing the same --output argument, or by other programs that accept '
'Butler repositories as input.',
epilog='',
parents=[_InputOutputParser(), _ProcessingParser()],
parents=[IngestionParser(), _InputOutputParser(), _ProcessingParser()],
add_help=True)


Expand Down Expand Up @@ -150,7 +150,7 @@ def runApVerify(cmdLine=None):
log.debug('Command-line arguments: %s', args)

workspace = WorkspaceGen3(args.output)
ingestDatasetGen3(args.dataset, workspace, processes=args.processes)
ingestDatasetGen3(args.dataset, workspace, args.namespace, args.restProxyUrl, processes=args.processes)
log.info('Running pipeline...')
# Gen 3 pipeline includes both AP and metrics
return runApPipeGen3(workspace, args, processes=args.processes)
Expand Down
21 changes: 18 additions & 3 deletions python/lsst/ap/verify/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __repr__(self):
"""
return f"Dataset({self._id!r})"

def makeCompatibleRepoGen3(self, repoDir):
def makeCompatibleRepoGen3(self, repoDir, sasquatchNamespace, sasquatchRestProxyUrl):
"""Set up a directory as a Gen 3 repository compatible with this ap_verify dataset.

If the repository already exists, this call has no effect.
Expand All @@ -170,10 +170,25 @@ def makeCompatibleRepoGen3(self, repoDir):
seedConfig = dafButler.Config()
# Checksums greatly slow importing of large repositories
seedConfig["datastore", "checksum"] = False
transfMode = "auto"
if sasquatchRestProxyUrl is not None:
seedConfig[
"datastore", "cls"] = "lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore"

datastores = [
{"cls": "lsst.daf.butler.datastores.fileDatastore.FileDatastore",
"root": "<butlerRoot>",
},
{"cls": "lsst.analysis.tools.interfaces.datastore.SasquatchDatastore",
"restProxyUrl": sasquatchRestProxyUrl,
"namespace": sasquatchNamespace,
},
]
seedConfig["datastore", "datastores"] = datastores
transfMode = "direct"
repoConfig = dafButler.Butler.makeRepo(repoDir, config=seedConfig)
butler = dafButler.Butler(repoConfig, writeable=True)
butler.import_(directory=self._preloadedRepo, filename=self._preloadedExport,
transfer="auto")
butler.import_(directory=self._preloadedRepo, filename=self._preloadedExport, transfer=transfMode)
except FileExistsError:
pass

Expand Down
45 changes: 40 additions & 5 deletions python/lsst/ap/verify/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

__all__ = ["Gen3DatasetIngestConfig", "ingestDatasetGen3"]

import argparse
import fnmatch
import os
import shutil
Expand All @@ -45,6 +46,24 @@
_LOG = logging.getLogger(__name__)


class IngestionParser(argparse.ArgumentParser):
"""An argument parser for data needed by ingestion.

This parser is not complete, and is designed to be passed to another parser
using the `parent` parameter.
"""

def __init__(self):
# Help and documentation will be handled by main program's parser
argparse.ArgumentParser.__init__(self, add_help=False)

self.add_argument('--namespace', dest='namespace', default=None,
help='The sasquastch namespace to use for the ap_verify metrics upload.')

self.add_argument('--restProxyUrl', dest='restProxyUrl', default=None,
help='The sasquastch url to use for the ap_verify metrics upload.')


class Gen3DatasetIngestConfig(pexConfig.Config):
"""Settings and defaults for `Gen3DatasetIngestTask`.

Expand Down Expand Up @@ -74,6 +93,10 @@ class Gen3DatasetIngestConfig(pexConfig.Config):
"supersedes ``dataFiles``.",
)

def setDefaults(self):
super().setDefaults()
self.ingester.transfer = "copy"


class Gen3DatasetIngestTask(pipeBase.Task):
"""Task for automating ingestion of a ap_verify dataset.
Expand All @@ -95,18 +118,23 @@ class Gen3DatasetIngestTask(pipeBase.Task):
# Suffix is de-facto convention for distinguishing Gen 2 and Gen 3 config overrides
_DefaultName = "datasetIngest-gen3"

def __init__(self, dataset, workspace, *args, **kwargs):
def __init__(self, dataset, workspace, namespace, url, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workspace = workspace
self.dataset = dataset
self.namespace = namespace
self.url = url
# workspace.workButler is undefined until the repository is created
self.dataset.makeCompatibleRepoGen3(self.workspace.repo)
self.dataset.makeCompatibleRepoGen3(self.workspace.repo, self.namespace, self.url)
if self.url is not None:
self.transferMode = "copy"
self.makeSubtask("ingester", butler=self.workspace.workButler)
self.makeSubtask("visitDefiner", butler=self.workspace.workButler)

def _reduce_kwargs(self):
# Add extra parameters to pickle
return dict(**super()._reduce_kwargs(), dataset=self.dataset, workspace=self.workspace)
return dict(**super()._reduce_kwargs(), dataset=self.dataset,
workspace=self.workspace, namespace=self.namespace, url=self.url)

def run(self, processes=1):
"""Ingest the contents of a dataset into a Butler repository.
Expand Down Expand Up @@ -242,7 +270,7 @@ def _copyConfigs(self):
self.log.info("Configs are now stored in %s.", self.workspace.pipelineDir)


def ingestDatasetGen3(dataset, workspace, processes=1):
def ingestDatasetGen3(dataset, workspace, sasquatchNamespace, sasquatchUrl, processes=1):
"""Ingest the contents of an ap_verify dataset into a Gen 3 Butler repository.

The original data directory is not modified.
Expand All @@ -254,12 +282,19 @@ def ingestDatasetGen3(dataset, workspace, processes=1):
workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
The abstract location where the epository is be created, if it does
not already exist.
sasquatchNamespace : `str`
The name of the namespace to post the ap_verify metrics to.
sasquatchUrl : `str`
The URL of the server to post the ap_verify metrics to.
processes : `int`
The number processes to use to ingest.
"""
log = _LOG.getChild("ingestDataset")

ingester = Gen3DatasetIngestTask(dataset, workspace, config=_getConfig(Gen3DatasetIngestTask, dataset))
ingester = Gen3DatasetIngestTask(
dataset, workspace, sasquatchNamespace, sasquatchUrl,
config=_getConfig(Gen3DatasetIngestTask, dataset)
)
ingester.run(processes=processes)
log.info("Data ingested")

Expand Down