Skip to content

Commit

Permalink
feat: dashboard for jobs (#139)
Browse files Browse the repository at this point in the history
## changes

- [x] starting a job writes job info to disk
- [x] ~~streamlit app with dashboard that reads from cached jobs~~
- [x] ~~embedding of streamlit app in jupyter notebook~~
- [x] auto reload of dashboard
  • Loading branch information
sg-s authored Jan 15, 2025
1 parent 08086a0 commit 4bff328
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/tools/run.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""This module contains functions to start runs of various first-party tools on the Deep Origin platform"""

import json
import os
from typing import Optional

from beartype import beartype
from deeporigin.data_hub import api
from deeporigin.tools.utils import make_payload, run_tool
from deeporigin.utils.core import _ensure_do_folder


@beartype
Expand Down Expand Up @@ -315,5 +318,17 @@ def _process_job(
execution_id = response.attributes.executionId
job_id = response.id

# Define the cache directory and file path
cache_dir = _ensure_do_folder() / "jobs"
if not cache_dir.exists():
cache_dir.mkdir(parents=True)
cache_file = os.path.join(cache_dir, f"{job_id}.json")

# Ensure the cache directory exists
os.makedirs(cache_dir, exist_ok=True)

with open(cache_file, "w") as file:
json.dump(response, file, indent=4)

print(f"🧬 Job started with ID: {job_id}, execution ID: {execution_id}")
return job_id
137 changes: 137 additions & 0 deletions src/tools/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
"""This module contains utility functions used by tool execution. In general, you will not need to use many of these functions directly."""

import functools
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from beartype import beartype
from box import Box
from deeporigin.config import get_value
from deeporigin.platform import clusters, tools
from deeporigin.platform.tools import execute_tool
from deeporigin.utils.core import _ensure_do_folder

JOBS_CACHE_DIR = _ensure_do_folder() / "jobs"

TERMINAL_STATES = {"Succeeded", "Failed"}
NON_TERMINAL_STATES = {"Created", "Queued", "Running"}


@beartype
Expand All @@ -26,6 +36,18 @@ def query_run_status(job_id: str) -> str:
org_friendly_id=get_value()["organization_id"], resource_id=job_id
)

# Define the cache directory and file path

if not JOBS_CACHE_DIR.exists():
JOBS_CACHE_DIR.mkdir(parents=True)
cache_file = os.path.join(JOBS_CACHE_DIR, f"{job_id}.json")

# Ensure the cache directory exists
os.makedirs(JOBS_CACHE_DIR, exist_ok=True)

with open(cache_file, "w") as file:
json.dump(data, file, indent=4)

return data.attributes.status


Expand Down Expand Up @@ -57,6 +79,46 @@ def wait_for_job(
bs = "".join(["\b" for _ in range(txt_length)])


@beartype
def wait_for_jobs(
refresh_time: int = 3,
hide_succeeded: bool = True,
):
"""Wait for all jobs started via this client to complete
Args:
refresh_time (int, optional): number of seconds to wait between polling. Defaults to 3.
hide_succeeded (bool, optional): whether to hide jobs that have already completed. Defaults to True.
Note that this function signature is explicitly not annotated with a return type to avoid importing pandas outside this function
Returns:
pd.DataFrame: dataframe of all jobs.
"""
df = get_job_dataframe(update=True)

if hide_succeeded:
df = df[df["Status"] != "Succeeded"]

from IPython.display import clear_output, display

while len(set(df["Status"]).difference(TERMINAL_STATES)) != 0:
df = get_job_dataframe(update=True)

if hide_succeeded:
df = df[df["Status"] != "Succeeded"]

display(df)
time.sleep(refresh_time)

clear_output(wait=True)

print("✔️ All jobs completed")
df = get_job_dataframe()
return df


@beartype
def _resolve_column_name(column_name: str, cols: list) -> str:
"""resolve column IDs from column name
Expand Down Expand Up @@ -184,3 +246,78 @@ def _column_name_to_column_id(data: dict, cols: list) -> dict:
for item in value
]
return data


def get_job_dataframe(update: bool = False):
"""returns a dataframe of all jobs and statuses, reading from local cache
Args:
update (bool, optional): Whether to check for updates on non-terminal jobs. Defaults to False.
Note that this function is deliberately not annotated with an output type because pandas is imported internally to this funciton.
Returns:
pd.DataFrame: A dataframe containing job information"""
jobs = read_jobs()

if update:
_update_all_jobs(jobs)
jobs = read_jobs()

import pandas as pd

df = pd.DataFrame(
{
"Job Id": [job.id for job in jobs],
"Execution ID": [job.attributes.executionId for job in jobs],
"Status": [job.attributes.status for job in jobs],
"Tool": [job.attributes.toolId for job in jobs],
}
)
return df


@beartype
def _update_all_jobs(jobs: list) -> None:
"""Update all job response files with the latest status, in parallel.
This is an internal function. Do not use.
"""

def should_update(job):
return job.attributes.status in NON_TERMINAL_STATES

with ThreadPoolExecutor() as executor:
futures = [
executor.submit(query_run_status, job.id)
for job in jobs
if should_update(job)
]

for future in futures:
try:
future.result()
except Exception as e:
print(f"An error occurred: {e}")


@beartype
def read_jobs() -> list:
"""read jobs from files in the jobs cache directory"""

jobs = []

if os.path.exists(JOBS_CACHE_DIR):
for file_name in os.listdir(JOBS_CACHE_DIR):
if file_name.endswith(".json"):
file_path = os.path.join(JOBS_CACHE_DIR, file_name)
try:
with open(file_path, "r") as f:
job_data = json.load(f)
jobs.append(Box(job_data))
except Exception as e:
print(f"Failed to load {file_path}: {e}")
else:
print(f"Directory {JOBS_CACHE_DIR} does not exist.")

return jobs

0 comments on commit 4bff328

Please sign in to comment.