From 54e969762cc842e2de1a7105884d0f80ec1aa38f Mon Sep 17 00:00:00 2001 From: Toby Jennings Date: Wed, 29 Jan 2025 10:49:25 -0600 Subject: [PATCH] feat(allocate): Allocate wms resources at end of daemon iteration --- src/lsst/cmservice/common/daemon.py | 112 +++++++++++++++++++++++++- src/lsst/cmservice/common/htcondor.py | 16 ++-- src/lsst/cmservice/config.py | 29 +++++-- 3 files changed, 145 insertions(+), 12 deletions(-) diff --git a/src/lsst/cmservice/common/daemon.py b/src/lsst/cmservice/common/daemon.py index e897586d..7760f407 100644 --- a/src/lsst/cmservice/common/daemon.py +++ b/src/lsst/cmservice/common/daemon.py @@ -1,12 +1,16 @@ +import importlib.util +import os +import sys from datetime import datetime, timedelta from sqlalchemy.ext.asyncio import async_scoped_session from sqlalchemy.future import select -from ..common.logging import LOGGER from ..config import config from ..db.queue import Queue from ..db.script import Script +from .htcondor import build_htcondor_submit_environment +from .logging import LOGGER logger = LOGGER.bind(module=__name__) @@ -20,6 +24,7 @@ async def daemon_iteration(session: async_scoped_session) -> None: # TODO: should the daemon check any campaigns with a state == prepared that # do not have queues? Queue creation should not be a manual step. queue_entry: Queue + processed_nodes = 0 for (queue_entry,) in queue_entries: try: queued_node = await queue_entry.get_node(session) @@ -30,9 +35,11 @@ async def daemon_iteration(session: async_scoped_session) -> None: ): logger.info("Processing queue_entry %s", queued_node.fullname) await queue_entry.process_node(session) + processed_nodes += 1 sleep_time = await queue_entry.node_sleep_time(session) else: # Put this entry to sleep for a while + logger.debug("Not processing queue_entry %s", queued_node.fullname) sleep_time = config.daemon.processing_interval time_next_check = iteration_start + timedelta(seconds=sleep_time) queue_entry.time_next_check = time_next_check @@ -41,3 +48,106 @@ async def daemon_iteration(session: async_scoped_session) -> None: logger.exception() continue await session.commit() + + # Try to allocate resources at the end of the loop, but do not crash if it + # doesn't work. + # FIXME this could be run async + try: + if config.daemon.allocate_resources and processed_nodes > 0: + allocate_resources() + except Exception: + logger.exception() + + +def allocate_resources() -> None: + """Allocate resources for htcondor jobs submitted during the daemon + iteration. + """ + if (htcondor := sys.modules.get("htcondor")) is not None: + pass + elif (importlib.util.find_spec("htcondor")) is not None: + htcondor = importlib.import_module("htcondor") + + if htcondor is None: + logger.warning("HTcondor not available, will not allocate resources") + return + + # Ensure environment is configured for htcondor operations + # FIXME: the python process needs the correct condor env set up. Alternate + # to setting these values JIT in the os.environ would be to hack a way to + # have the config.htcondor submodel's validation_alias match the + # serialization_alias, e.g., "_CONDOR_value" + condor_environment = config.htcondor.model_dump(by_alias=True) + os.environ |= condor_environment + + coll = htcondor.Collector(config.htcondor.collector_host) + + # Do we need to allocate resources? i.e., are there idle condor jobs for + # which we are responsible? + + # TODO condor query for idle jobs with our batch_name + # FIXME we should round-robin submits to available schedds and approximate + # a global query for our idle jobs. + + # schedds = coll.locateAll(htcondor.DaemonTypes.Schedd) + + # Mapping of schedd ad to a list of its idle jobs + # idle_jobs = { + # ad: htcondor.Schedd(ad).query( + # projection=["ClusterId"], + # constraint="(JobStatus == 1)", + # opts=htcondor.QueryOpts.DefaultMyJobsOnly, + # ) + # for ad in schedds + # } + + # # Filter query result to those schedds with idle jobs + # idle_job_schedds = [k for k, v in idle_jobs.items() if v] + + # if not idle_job_schedds: + # return + + # the schedd to which we need to submit this job should be one where idle + # jobs are available. Pick one per daemon iteration; if there are multiple + # schedds with idle jobs, the next loop will pick it up. + # schedd = htcondor.Schedd(idle_job_schedds.pop()) + + # FIXME only queries the single schedd to which we are submitting jobs + schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, name=config.htcondor.schedd_host) + schedd = htcondor.Schedd(schedd_ad) + + idle_jobs = schedd.query( + projection=["ClusterId"], + constraint="(JobStatus == 1)", + opts=htcondor.QueryOpts.DefaultMyJobsOnly, + ) + if not idle_jobs: + return + + # Set the htcondor config in the submission environment + # The environment command in the submit file is a double-quoted, + # whitespace-delimited list of name=value pairs where literal quote marks + # are doubled ("" or ''). + submission_environment = " ".join([f"{k}={v}" for k, v in build_htcondor_submit_environment().items()]) + + # The minimum necessary submission spec executes a resoruce allocation + # script to the local universe and does not preserve the output. + submission_spec = { + "executable": f"{config.htcondor.remote_user_home}/.local/bin/allocateNodes.py", + "arguments": ( + f"--auto --account {config.slurm.account} -n 50 -m 4-00:00:00 " + f"-q {config.slurm.partition} -g 240 {config.slurm.platform}" + ), + "environment": f'"{submission_environment}"', + "initialdir": config.htcondor.working_directory, + "batch_name": config.htcondor.batch_name, + "universe": "local", + # "output": "allocate_resources.out", + # "error": "allocate_resources.err", + } + submit = htcondor.Submit(submission_spec) + + # job cluster id of our resource allocation script; fire and forget + cluster_id = schedd.submit(submit) + logger.info("Allocating Resources with condor job %s", cluster_id.cluster()) + logger.debug(cluster_id) diff --git a/src/lsst/cmservice/common/htcondor.py b/src/lsst/cmservice/common/htcondor.py index ea24052c..dbb82d49 100644 --- a/src/lsst/cmservice/common/htcondor.py +++ b/src/lsst/cmservice/common/htcondor.py @@ -186,23 +186,27 @@ def build_htcondor_submit_environment() -> Mapping[str, str]: should closer match the environment of an interactive sdfianaXXX user at SLAC. """ + # TODO use all configured htcondor config settings + # condor_environment = config.htcondor.model_dump(by_alias=True) + # TODO we should not always use the same schedd host. We could get a list + # of all schedds from the collector and pick one at random. return dict( - CONDOR_CONFIG="ONLY_ENV", + CONDOR_CONFIG=config.htcondor.config_source, _CONDOR_CONDOR_HOST=config.htcondor.collector_host, _CONDOR_COLLECTOR_HOST=config.htcondor.collector_host, _CONDOR_SCHEDD_HOST=config.htcondor.schedd_host, _CONDOR_SEC_CLIENT_AUTHENTICATION_METHODS=config.htcondor.authn_methods, - _CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV=str(config.htcondor.dagman_job_append_get_env), - DAF_BUTLER_REPOSITORY_INDEX=config.butler.repository_index, + _CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV="True", FS_REMOTE_DIR=config.htcondor.fs_remote_dir, - HOME=config.htcondor.user_home, + DAF_BUTLER_REPOSITORY_INDEX=config.butler.repository_index, + HOME=config.htcondor.remote_user_home, LSST_VERSION=config.bps.lsst_version, LSST_DISTRIB_DIR=config.bps.lsst_distrib_dir, # FIX: because there is no db-auth.yaml in lsstsvc1's home directory - PGPASSFILE=f"{config.htcondor.user_home}/.lsst/postgres-credentials.txt", + PGPASSFILE=f"{config.htcondor.remote_user_home}/.lsst/postgres-credentials.txt", PGUSER=config.butler.default_username, PATH=( - f"{config.htcondor.user_home}/.local/bin:{config.htcondor.user_home}/bin:{config.slurm.home}:" + f"{config.htcondor.remote_user_home}/.local/bin:{config.htcondor.remote_user_home}/bin:{config.slurm.home}:" f"/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin" ), ) diff --git a/src/lsst/cmservice/config.py b/src/lsst/cmservice/config.py index 1b416ccd..15c1875b 100644 --- a/src/lsst/cmservice/config.py +++ b/src/lsst/cmservice/config.py @@ -114,9 +114,16 @@ class HTCondorConfiguration(BaseModel): their serialization alias. """ - user_home: str = Field( + config_source: str = Field( + description="Source of htcondor configuration", + default="ONLY_ENV", + serialization_alias="CONDOR_CONFIG", + ) + + remote_user_home: str = Field( description=("Path to the user's home directory, as resolvable from an htcondor access node."), default="/sdf/home/l/lsstsvc1", + exclude=True, ) condor_home: str = Field( @@ -207,10 +214,12 @@ class HTCondorConfiguration(BaseModel): serialization_alias="FS_REMOTE_DIR", ) - # FIXME: unclear if this is at all necessary - dagman_job_append_get_env: bool = Field( - description="...", default=True, serialization_alias="_CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV" - ) + # FIXME: unclear if this is necessary or specific to bps submit jobs + # dagman_job_append_get_env: str = Field( + # description="...", + # default="true", + # serialization_alias="_CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV", + # ) # TODO deprecate and remove "slurm"-specific logic from cm-service; it is @@ -249,6 +258,11 @@ class SlurmConfiguration(BaseModel): default="milano", ) + platform: str = Field( + description="Platform requested when submitting a slurm job.", + default="s3df", + ) + class AsgiConfiguration(BaseModel): """Configuration for the application's ASGI web server.""" @@ -309,6 +323,11 @@ class DaemonConfiguration(BaseModel): Set according to DAEMON__FIELD environment variables. """ + allocate_resources: bool = Field( + default=False, + description="Whether the daemon should try to allocate its own htcondor or slurm resources.", + ) + processing_interval: int = Field( default=30, description=(