Skip to content

Commit

Permalink
Add SasquatchDatastore to enable uploading analysis_tools metrics
Browse files Browse the repository at this point in the history
Update ci run script to take namespace and url for sasquatch upload."

Add optional ChainedDatastore to host SasquatchDatastore

Add new parameters to accomodate for Sasquatch metric upload
  • Loading branch information
BrunoSanchez committed Feb 17, 2025
1 parent 1a3ea5e commit 5048a9e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 14 deletions.
30 changes: 28 additions & 2 deletions bin/run_ci_dataset.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,57 @@ print_error() {

usage() {
print_error
print_error "Usage: $0 -d DATASET [-g NUM] [-p PATH] [-h]"
print_error "Usage: $0 -d DATASET [-g NUM] [-p PATH] [-n NAMESPACE] [-u URL] [-h]"
print_error
print_error "Specific options:"
print_error " -d Dataset name"
print_error " -g Middleware generation number (int) [currently unused]"
print_error " -p Pipeline to run"
print_error " -n Namespace for metrics upload (optional, but required if -u is set)"
print_error " -u URL for metrics upload (optional, but required if -n is set)"
print_error " -h show this message"
exit 1
}

while getopts "d:g:p:h" option; do
while getopts "d:g:p:n:u:h" option; do
case "$option" in
d) DATASET="$OPTARG";;
g) GEN="$OPTARG";;
p) PIPE="$OPTARG";;
n) NAMESPACE="$OPTARG";;
u) URL="$OPTARG";;
h) usage;;
*) usage;;
esac
done

if [[ -z "${DATASET}" ]]; then
print_error "$0: mandatory argument -- d"
usage
exit 1
fi

# Ensure both NAMESPACE and URL exist, or neither does
if { [[ -n "${NAMESPACE}" ]] && [[ -z "${URL}" ]]; } || { [[ -z "${NAMESPACE}" ]] && [[ -n "${URL}" ]]; }; then
print_error "Error: Both -n (namespace) and -u (URL) must be provided together, or neither."
usage
exit 1
fi

# Set PIPE argument if provided
if [[ -n "${PIPE}" ]]; then
PIPE="--pipeline ${PIPE}"
fi

# Set NAMESPACE and URL arguments if both are provided
if [[ -n "${NAMESPACE}" && -n "${URL}" ]]; then
NAMESPACE_ARG="--namespace ${NAMESPACE}"
URL_ARG="--restProxyUrl ${URL}"
else
NAMESPACE_ARG=""
URL_ARG=""
fi

shift $((OPTIND-1))

PRODUCT_DIR=${AP_VERIFY_DIR}
Expand Down Expand Up @@ -92,4 +116,6 @@ ap_verify.py --dataset "${DATASET}" \
${PIPE} \
--output "${WORKSPACE}" \
--processes "${NUMPROC}" \
${NAMESPACE_ARG} \
${URL_ARG} \
&>> "${WORKSPACE}"/apVerify.log
8 changes: 4 additions & 4 deletions python/lsst/ap/verify/ap_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import lsst.log

from .dataset import Dataset
from .ingestion import ingestDatasetGen3
from .ingestion import ingestDatasetGen3, IngestionParser
from .pipeline_driver import ApPipeParser, runApPipeGen3
from .workspace import WorkspaceGen3

Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self):
self,
description='Executes the LSST DM AP pipeline and analyzes its performance using metrics.',
epilog='',
parents=[_InputOutputParser(), _ProcessingParser(), ApPipeParser(), ],
parents=[IngestionParser(), _InputOutputParser(), _ProcessingParser(), ApPipeParser(), ],
add_help=True)


Expand All @@ -109,7 +109,7 @@ def __init__(self):
'passing the same --output argument, or by other programs that accept '
'Butler repositories as input.',
epilog='',
parents=[_InputOutputParser(), _ProcessingParser()],
parents=[IngestionParser(), _InputOutputParser(), _ProcessingParser()],
add_help=True)


Expand Down Expand Up @@ -150,7 +150,7 @@ def runApVerify(cmdLine=None):
log.debug('Command-line arguments: %s', args)

workspace = WorkspaceGen3(args.output)
ingestDatasetGen3(args.dataset, workspace, processes=args.processes)
ingestDatasetGen3(args.dataset, workspace, args.namespace, args.restProxyUrl, processes=args.processes)
log.info('Running pipeline...')
# Gen 3 pipeline includes both AP and metrics
return runApPipeGen3(workspace, args, processes=args.processes)
Expand Down
20 changes: 17 additions & 3 deletions python/lsst/ap/verify/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __repr__(self):
"""
return f"Dataset({self._id!r})"

def makeCompatibleRepoGen3(self, repoDir):
def makeCompatibleRepoGen3(self, repoDir, sasquatchNamespace, sasquatchRestProxyUrl):
"""Set up a directory as a Gen 3 repository compatible with this ap_verify dataset.
If the repository already exists, this call has no effect.
Expand All @@ -170,10 +170,24 @@ def makeCompatibleRepoGen3(self, repoDir):
seedConfig = dafButler.Config()
# Checksums greatly slow importing of large repositories
seedConfig["datastore", "checksum"] = False
transfMode = "auto"
if sasquatchRestProxyUrl is not None:
seedConfig["datastore", "cls"] = "lsst.daf.butler.datastores.chainedDatastore.ChainedDatastore"

Check failure on line 175 in python/lsst/ap/verify/dataset.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E501

line too long (111 > 110 characters)

datastores = [
{"cls": "lsst.daf.butler.datastores.fileDatastore.FileDatastore",
"root": "<butlerRoot>",

Check failure on line 179 in python/lsst/ap/verify/dataset.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E128

continuation line under-indented for visual indent
},

Check failure on line 180 in python/lsst/ap/verify/dataset.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E124

closing bracket does not match visual indentation
{"cls": "lsst.analysis.tools.interfaces.datastore.SasquatchDatastore",
"restProxyUrl": sasquatchRestProxyUrl,

Check failure on line 182 in python/lsst/ap/verify/dataset.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E128

continuation line under-indented for visual indent
"namespace": sasquatchNamespace,

Check failure on line 183 in python/lsst/ap/verify/dataset.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E128

continuation line under-indented for visual indent
},

Check failure on line 184 in python/lsst/ap/verify/dataset.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

E124

closing bracket does not match visual indentation
]
seedConfig["datastore", "datastores"] = datastores
transfMode = "direct"
repoConfig = dafButler.Butler.makeRepo(repoDir, config=seedConfig)
butler = dafButler.Butler(repoConfig, writeable=True)
butler.import_(directory=self._preloadedRepo, filename=self._preloadedExport,
transfer="auto")
butler.import_(directory=self._preloadedRepo, filename=self._preloadedExport, transfer=transfMode)
except FileExistsError:
pass

Expand Down
45 changes: 40 additions & 5 deletions python/lsst/ap/verify/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

__all__ = ["Gen3DatasetIngestConfig", "ingestDatasetGen3"]

import argparse
import fnmatch
import os
import shutil
Expand All @@ -45,6 +46,24 @@
_LOG = logging.getLogger(__name__)


class IngestionParser(argparse.ArgumentParser):
"""An argument parser for data needed by ingestion.
This parser is not complete, and is designed to be passed to another parser
using the `parent` parameter.
"""

def __init__(self):
# Help and documentation will be handled by main program's parser
argparse.ArgumentParser.__init__(self, add_help=False)

self.add_argument('--namespace', dest='namespace', default=None,
help='The sasquastch namespace to use for the ap_verify metrics upload.')

self.add_argument('--restProxyUrl', dest='restProxyUrl', default=None,
help='The sasquastch url to use for the ap_verify metrics upload.')


class Gen3DatasetIngestConfig(pexConfig.Config):
"""Settings and defaults for `Gen3DatasetIngestTask`.
Expand Down Expand Up @@ -74,6 +93,10 @@ class Gen3DatasetIngestConfig(pexConfig.Config):
"supersedes ``dataFiles``.",
)

def setDefaults(self):
super().setDefaults()
self.ingester.transfer = "copy"


class Gen3DatasetIngestTask(pipeBase.Task):
"""Task for automating ingestion of a ap_verify dataset.
Expand All @@ -95,18 +118,23 @@ class Gen3DatasetIngestTask(pipeBase.Task):
# Suffix is de-facto convention for distinguishing Gen 2 and Gen 3 config overrides
_DefaultName = "datasetIngest-gen3"

def __init__(self, dataset, workspace, *args, **kwargs):
def __init__(self, dataset, workspace, namespace, url, *args, **kwargs):
super().__init__(*args, **kwargs)
self.workspace = workspace
self.dataset = dataset
self.namespace = namespace
self.url = url
# workspace.workButler is undefined until the repository is created
self.dataset.makeCompatibleRepoGen3(self.workspace.repo)
self.dataset.makeCompatibleRepoGen3(self.workspace.repo, self.namespace, self.url)
if self.url is not None:
self.transferMode = "copy"
self.makeSubtask("ingester", butler=self.workspace.workButler)
self.makeSubtask("visitDefiner", butler=self.workspace.workButler)

def _reduce_kwargs(self):
# Add extra parameters to pickle
return dict(**super()._reduce_kwargs(), dataset=self.dataset, workspace=self.workspace)
return dict(**super()._reduce_kwargs(), dataset=self.dataset,
workspace=self.workspace, namespace=self.namespace, url=self.url)

def run(self, processes=1):
"""Ingest the contents of a dataset into a Butler repository.
Expand Down Expand Up @@ -242,7 +270,7 @@ def _copyConfigs(self):
self.log.info("Configs are now stored in %s.", self.workspace.pipelineDir)


def ingestDatasetGen3(dataset, workspace, processes=1):
def ingestDatasetGen3(dataset, workspace, sasquatchNamespace, sasquatchUrl, processes=1):
"""Ingest the contents of an ap_verify dataset into a Gen 3 Butler repository.
The original data directory is not modified.
Expand All @@ -254,12 +282,19 @@ def ingestDatasetGen3(dataset, workspace, processes=1):
workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
The abstract location where the epository is be created, if it does
not already exist.
sasquatchNamespace : `str`
The name of the namespace to post the ap_verify metrics to.
sasquatchUrl : `str`
The URL of the server to post the ap_verify metrics to.
processes : `int`
The number processes to use to ingest.
"""
log = _LOG.getChild("ingestDataset")

ingester = Gen3DatasetIngestTask(dataset, workspace, config=_getConfig(Gen3DatasetIngestTask, dataset))
ingester = Gen3DatasetIngestTask(
dataset, workspace, sasquatchNamespace, sasquatchUrl,
config=_getConfig(Gen3DatasetIngestTask, dataset)
)
ingester.run(processes=processes)
log.info("Data ingested")

Expand Down

0 comments on commit 5048a9e

Please sign in to comment.