From 5c8ca94a05ff4eb0a6d99fd4b27a26b153d271a5 Mon Sep 17 00:00:00 2001 From: Sean Rastatter Date: Mon, 6 Mar 2023 17:05:54 -0500 Subject: [PATCH] removing build dir --- .gitignore | 1 + build/lib/AutoMLOps/AutoMLOps.py | 812 ----------------------- build/lib/AutoMLOps/AutoMLOps_test.py | 21 - build/lib/AutoMLOps/BuilderUtils.py | 333 ---------- build/lib/AutoMLOps/CloudRunBuilder.py | 391 ----------- build/lib/AutoMLOps/ComponentBuilder.py | 199 ------ build/lib/AutoMLOps/JupyterUtilsMagic.py | 71 -- build/lib/AutoMLOps/PipelineBuilder.py | 318 --------- build/lib/AutoMLOps/__init__.py | 28 - 9 files changed, 1 insertion(+), 2173 deletions(-) delete mode 100644 build/lib/AutoMLOps/AutoMLOps.py delete mode 100644 build/lib/AutoMLOps/AutoMLOps_test.py delete mode 100644 build/lib/AutoMLOps/BuilderUtils.py delete mode 100644 build/lib/AutoMLOps/CloudRunBuilder.py delete mode 100644 build/lib/AutoMLOps/ComponentBuilder.py delete mode 100644 build/lib/AutoMLOps/JupyterUtilsMagic.py delete mode 100644 build/lib/AutoMLOps/PipelineBuilder.py delete mode 100644 build/lib/AutoMLOps/__init__.py diff --git a/.gitignore b/.gitignore index 07b9178..9afe373 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ __pycache__/ # Distribution / packaging .Python build/ +build/* develop-eggs/ dist/ downloads/ diff --git a/build/lib/AutoMLOps/AutoMLOps.py b/build/lib/AutoMLOps/AutoMLOps.py deleted file mode 100644 index 4c7fe7e..0000000 --- a/build/lib/AutoMLOps/AutoMLOps.py +++ /dev/null @@ -1,812 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""AutoMLOps is a tool that generates a production-style MLOps pipeline - from Jupyter Notebooks.""" - -# pylint: disable=C0103 -# pylint: disable=unused-import -# pylint: disable=line-too-long - -import logging -import os -import re -import subprocess -from typing import Dict, List - -from AutoMLOps import BuilderUtils -from AutoMLOps import ComponentBuilder -from AutoMLOps import PipelineBuilder -from AutoMLOps import CloudRunBuilder -from AutoMLOps import JupyterUtilsMagic - -logger = logging.getLogger() -log_level = os.environ.get('LOG_LEVEL', 'INFO') -logger.setLevel(log_level) - -TOP_LVL_NAME = 'AutoMLOps/' -DEFAULTS_FILE = TOP_LVL_NAME + 'configs/defaults.yaml' -PIPELINE_SPEC_SH_FILE = TOP_LVL_NAME + 'scripts/build_pipeline_spec.sh' -BUILD_COMPONENTS_SH_FILE = TOP_LVL_NAME + 'scripts/build_components.sh' -RUN_PIPELINE_SH_FILE = TOP_LVL_NAME + 'scripts/run_pipeline.sh' -RUN_ALL_SH_FILE = TOP_LVL_NAME + 'scripts/run_all.sh' -RESOURCES_SH_FILE = TOP_LVL_NAME + 'scripts/create_resources.sh' -SUBMIT_JOB_FILE = TOP_LVL_NAME + 'scripts/submit_to_runner_svc.sh' -CLOUDBUILD_FILE = TOP_LVL_NAME + 'cloudbuild.yaml' -PIPELINE_FILE = TOP_LVL_NAME + 'pipelines/pipeline.py' -IMPORTS_FILE = '.imports.py' -DEFAULT_IMAGE = 'python:3.9' -COMPONENT_BASE = TOP_LVL_NAME + 'components/component_base' -COMPONENT_BASE_SRC = TOP_LVL_NAME + 'components/component_base/src' -OUTPUT_DIR = BuilderUtils.TMPFILES_DIR -DIRS = [ - TOP_LVL_NAME, - TOP_LVL_NAME + 'components', - TOP_LVL_NAME + 'components/component_base', - TOP_LVL_NAME + 'components/component_base/src', - TOP_LVL_NAME + 'configs', - TOP_LVL_NAME + 'images', - TOP_LVL_NAME + 'pipelines', - TOP_LVL_NAME + 'pipelines/runtime_parameters', - TOP_LVL_NAME + 'scripts', - TOP_LVL_NAME + 'scripts/pipeline_spec'] - -def go(project_id: str, - pipeline_params: Dict, - af_registry_location: str = 'us-central1', - af_registry_name: str = 'vertex-mlops-af', - cb_trigger_location: str = 'us-central1', - cb_trigger_name: str = 'automlops-trigger', - cloud_run_location: str = 'us-central1', - cloud_run_name: str = 'run-pipeline', - cloud_tasks_queue_location: str = 'us-central1', - cloud_tasks_queue_name: str = 'queueing-svc', - csr_branch_name: str = 'automlops', - csr_name: str = 'AutoMLOps-repo', - custom_training_job_specs: List[Dict] = None, - gs_bucket_location: str = 'us-central1', - gs_bucket_name: str = None, - pipeline_runner_sa: str = None, - run_local: bool = True, - schedule_location: str = 'us-central1', - schedule_name: str = 'AutoMLOps-schedule', - schedule_pattern: str = 'No Schedule Specified', - use_kfp_spec: bool = False, - vpc_connector: str = 'No VPC Specified'): - """Generates relevant pipeline and component artifacts, - then builds, compiles, and submits the PipelineJob. - - Args: - project_id: The project ID. - pipeline_params: Dictionary containing runtime pipeline parameters. - af_registry_location: Region of the Artifact Registry. - af_registry_name: Artifact Registry name where components are stored. - cb_trigger_location: The location of the cloudbuild trigger. - cb_trigger_name: The name of the cloudbuild trigger. - cloud_run_location: The location of the cloud runner service. - cloud_run_name: The name of the cloud runner service. - cloud_tasks_queue_location: The location of the cloud tasks queue. - cloud_tasks_queue_name: The name of the cloud tasks queue. - csr_branch_name: The name of the csr branch to push to to trigger cb job. - csr_name: The name of the cloud source repo to use. - custom_training_job_specs: Specifies the specs to run the training job with. - gs_bucket_location: Region of the GS bucket. - gs_bucket_name: GS bucket name where pipeline run metadata is stored. - pipeline_runner_sa: Service Account to runner PipelineJobs. - run_local: Flag that determines whether to use Cloud Run CI/CD. - schedule_location: The location of the scheduler resource. - schedule_name: The name of the scheduler resource. - schedule_pattern: Cron formatted value used to create a Scheduled retrain job. - use_kfp_spec: Flag that determines the format of the component yamls. - vpc_connector: The name of the vpc connector to use. - """ - generate(project_id, pipeline_params, af_registry_location, - af_registry_name, cb_trigger_location, cb_trigger_name, - cloud_run_location, cloud_run_name, cloud_tasks_queue_location, - cloud_tasks_queue_name, csr_branch_name, csr_name, - custom_training_job_specs, gs_bucket_location, gs_bucket_name, - pipeline_runner_sa, run_local, schedule_location, - schedule_name, schedule_pattern, use_kfp_spec, - vpc_connector) - run(run_local) - -def generate(project_id: str, - pipeline_params: Dict, - af_registry_location: str = 'us-central1', - af_registry_name: str = 'vertex-mlops-af', - cb_trigger_location: str = 'us-central1', - cb_trigger_name: str = 'automlops-trigger', - cloud_run_location: str = 'us-central1', - cloud_run_name: str = 'run-pipeline', - cloud_tasks_queue_location: str = 'us-central1', - cloud_tasks_queue_name: str = 'queueing-svc', - csr_branch_name: str = 'automlops', - csr_name: str = 'AutoMLOps-repo', - custom_training_job_specs: List[Dict] = None, - gs_bucket_location: str = 'us-central1', - gs_bucket_name: str = None, - pipeline_runner_sa: str = None, - run_local: bool = True, - schedule_location: str = 'us-central1', - schedule_name: str = 'AutoMLOps-schedule', - schedule_pattern: str = 'No Schedule Specified', - use_kfp_spec: bool = False, - vpc_connector: str = 'No VPC Specified'): - """Generates relevant pipeline and component artifacts. - - Args: See go() function. - """ - BuilderUtils.validate_schedule(schedule_pattern, run_local) - default_bucket_name = f'{project_id}-bucket' if gs_bucket_name is None else gs_bucket_name - default_pipeline_runner_sa = f'vertex-pipelines@{project_id}.iam.gserviceaccount.com' if pipeline_runner_sa is None else pipeline_runner_sa - BuilderUtils.make_dirs(DIRS) - _create_default_config(af_registry_location, af_registry_name, cb_trigger_location, - cb_trigger_name, cloud_run_location, cloud_run_name, - cloud_tasks_queue_location, cloud_tasks_queue_name, csr_branch_name, - csr_name, gs_bucket_location, default_bucket_name, - default_pipeline_runner_sa, project_id, schedule_location, - schedule_name, schedule_pattern, vpc_connector) - - _create_scripts(run_local) - _create_cloudbuild_config(run_local) - # copy tmp pipeline file over to AutoMLOps dir - BuilderUtils.execute_process(f'cp {BuilderUtils.PIPELINE_TMPFILE} {PIPELINE_FILE}', to_null=False) - # Create components and pipelines - components_path_list = BuilderUtils.get_components_list() - for path in components_path_list: - ComponentBuilder.formalize(path, TOP_LVL_NAME, DEFAULTS_FILE, use_kfp_spec) - PipelineBuilder.formalize(custom_training_job_specs, DEFAULTS_FILE, pipeline_params, TOP_LVL_NAME) - if not use_kfp_spec: - _autoflake_srcfiles() - _create_requirements(use_kfp_spec) - _create_dockerfile() - if not run_local: - CloudRunBuilder.formalize(TOP_LVL_NAME, DEFAULTS_FILE) - -def run(run_local: bool): - """Builds, compiles, and submits the PipelineJob. - - Args: - run_local: Flag that determines whether to use Cloud Run CI/CD. - """ - BuilderUtils.execute_process('./'+RESOURCES_SH_FILE, to_null=False) - if run_local: - os.chdir(TOP_LVL_NAME) - BuilderUtils.execute_process('./scripts/run_all.sh', to_null=False) - os.chdir('../') - else: - _push_to_csr() - _resources_generation_manifest(run_local) - -def _resources_generation_manifest(run_local: bool): - """Logs urls of generated resources. - - Args: - run_local: Flag that determines whether to use Cloud Run CI/CD. - """ - defaults = BuilderUtils.read_yaml_file(DEFAULTS_FILE) - logging.info('\n' - '#################################################################\n' - '# #\n' - '# RESOURCES MANIFEST #\n' - '#---------------------------------------------------------------#\n' - '# Generated resources can be found at the following urls #\n' - '# #\n' - '#################################################################\n') - # pylint: disable=logging-fstring-interpolation - logging.info(f'''Google Cloud Storage Bucket: https://console.cloud.google.com/storage/{defaults['gcp']['gs_bucket_name']}''') - logging.info(f'''Artifact Registry: https://console.cloud.google.com/artifacts/docker/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_location']}/{defaults['gcp']['af_registry_name']}''') - logging.info(f'''Service Accounts: https://console.cloud.google.com/iam-admin/serviceaccounts?project={defaults['gcp']['project_id']}''') - logging.info('APIs: https://console.cloud.google.com/apis') - logging.info(f'''Cloud Source Repository: https://source.cloud.google.com/{defaults['gcp']['project_id']}/{defaults['gcp']['cloud_source_repository']}/+/{defaults['gcp']['cloud_source_repository_branch']}:''') - logging.info(f'''Cloud Build Jobs: https://console.cloud.google.com/cloud-build/builds;region={defaults['gcp']['cb_trigger_location']}''') - logging.info('Vertex AI Pipeline Runs: https://console.cloud.google.com/vertex-ai/pipelines/runs') - if not run_local: - logging.info(f'''Cloud Build Trigger: https://console.cloud.google.com/cloud-build/triggers;region={defaults['gcp']['cb_trigger_location']}''') - logging.info(f'''Cloud Run Service: https://console.cloud.google.com/run/detail/{defaults['gcp']['cloud_run_location']}/{defaults['gcp']['cloud_run_name']}''') - logging.info(f'''Cloud Tasks Queue: https://console.cloud.google.com/cloudtasks/queue/{defaults['gcp']['cloud_tasks_queue_location']}/{defaults['gcp']['cloud_tasks_queue_name']}/tasks''') - if defaults['gcp']['cloud_schedule_pattern'] != 'No Schedule Specified': - logging.info('Cloud Scheduler Job: https://console.cloud.google.com/cloudscheduler') - -def _push_to_csr(): - """Initializes a git repo if one doesn't already exist, - then pushes to the specified branch and triggers the cloudbuild job. - """ - defaults = BuilderUtils.read_yaml_file(DEFAULTS_FILE) - if not os.path.exists('.git'): - BuilderUtils.execute_process('git init', to_null=False) - BuilderUtils.execute_process('''git config --global credential.'https://source.developers.google.com'.helper gcloud.sh''', to_null=False) - BuilderUtils.execute_process(f'''git remote add origin https://source.developers.google.com/p/{defaults['gcp']['project_id']}/r/{defaults['gcp']['cloud_source_repository']}''', to_null=False) - BuilderUtils.execute_process(f'''git checkout -B {defaults['gcp']['cloud_source_repository_branch']}''', to_null=False) - has_remote_branch = subprocess.check_output([f'''git ls-remote origin {defaults['gcp']['cloud_source_repository_branch']}'''], shell=True, stderr=subprocess.STDOUT) - if not has_remote_branch: - # This will initialize the branch, a second push will be required to trigger the cloudbuild job after initializing - BuilderUtils.execute_process('touch .gitkeep', to_null=False) # needed to keep dir here - BuilderUtils.execute_process('git add .gitkeep', to_null=False) - BuilderUtils.execute_process('''git commit -m 'init' ''', to_null=False) - BuilderUtils.execute_process(f'''git push origin {defaults['gcp']['cloud_source_repository_branch']} --force''', to_null=False) - - BuilderUtils.execute_process(f'touch {TOP_LVL_NAME}scripts/pipeline_spec/.gitkeep', to_null=False) # needed to keep dir here - BuilderUtils.execute_process('git add .', to_null=False) - BuilderUtils.execute_process('''git commit -m 'Run AutoMLOps' ''', to_null=False) - BuilderUtils.execute_process(f'''git push origin {defaults['gcp']['cloud_source_repository_branch']} --force''', to_null=False) - # pylint: disable=logging-fstring-interpolation - logging.info(f'''Pushing code to {defaults['gcp']['cloud_source_repository_branch']} branch, triggering cloudbuild...''') - logging.info(f'''Cloudbuild job running at: https://console.cloud.google.com/cloud-build/builds;region={defaults['gcp']['cb_trigger_location']}''') - -def _create_default_config(af_registry_location: str, - af_registry_name: str, - cb_trigger_location: str, - cb_trigger_name: str, - cloud_run_location: str, - cloud_run_name: str, - cloud_tasks_queue_location: str, - cloud_tasks_queue_name: str, - csr_branch_name: str, - csr_name: str, - gs_bucket_location: str, - gs_bucket_name: str, - pipeline_runner_sa: str, - project_id: str, - schedule_location: str, - schedule_name: str, - schedule_pattern: str, - vpc_connector: str): - """Writes default variables to defaults.yaml. This defaults - file is used by subsequent functions and by the pipeline - files themselves. - - Args: - af_registry_location: Region of the Artifact Registry. - af_registry_name: Artifact Registry name where components are stored. - cb_trigger_location: The location of the cloudbuild trigger. - cb_trigger_name: The name of the cloudbuild trigger. - cloud_run_location: The location of the cloud runner service. - cloud_run_name: The name of the cloud runner service. - cloud_tasks_queue_location: The location of the cloud tasks queue. - cloud_tasks_queue_name: The name of the cloud tasks queue. - csr_branch_name: The name of the csr branch to push to to trigger cb job. - csr_name: The name of the cloud source repo to use. - gs_bucket_location: Region of the GS bucket. - gs_bucket_name: GS bucket name where pipeline run metadata is stored. - pipeline_runner_sa: Service Account to runner PipelineJobs. - project_id: The project ID. - schedule_location: The location of the scheduler resource. - schedule_name: The name of the scheduler resource. - schedule_pattern: Cron formatted value used to create a Scheduled retrain job. - vpc_connector: The name of the vpc connector to use. - """ - defaults = (BuilderUtils.LICENSE + - f'# These values are descriptive only - do not change.\n' - f'# Rerun AutoMLOps.generate() to change these values.\n' - f'gcp:\n' - f' af_registry_location: {af_registry_location}\n' - f' af_registry_name: {af_registry_name}\n' - f' cb_trigger_location: {cb_trigger_location}\n' - f' cb_trigger_name: {cb_trigger_name}\n' - f' cloud_run_location: {cloud_run_location}\n' - f' cloud_run_name: {cloud_run_name}\n' - f' cloud_tasks_queue_location: {cloud_tasks_queue_location}\n' - f' cloud_tasks_queue_name: {cloud_tasks_queue_name}\n' - f' cloud_schedule_location: {schedule_location}\n' - f' cloud_schedule_name: {schedule_name}\n' - f' cloud_schedule_pattern: {schedule_pattern}\n' - f' cloud_source_repository: {csr_name}\n' - f' cloud_source_repository_branch: {csr_branch_name}\n' - f' gs_bucket_name: {gs_bucket_name}\n' - f' pipeline_runner_service_account: {pipeline_runner_sa}\n' - f' project_id: {project_id}\n' - f' vpc_connector: {vpc_connector}\n' - f'\n' - f'pipelines:\n' - f' parameter_values_path: {BuilderUtils.PARAMETER_VALUES_PATH}\n' - f' pipeline_component_directory: components\n' - f' pipeline_job_spec_path: {BuilderUtils.PIPELINE_JOB_SPEC_PATH}\n' - f' pipeline_region: {gs_bucket_location}\n' - f' pipeline_storage_path: gs://{gs_bucket_name}/pipeline_root\n') - BuilderUtils.write_file(DEFAULTS_FILE, defaults, 'w+') - -def _create_scripts(run_local: bool): - """Writes various shell scripts used for pipeline and component - construction, as well as pipeline execution. - - Args: - run_local: Flag that determines whether to use Cloud Run CI/CD. - """ - build_pipeline_spec = ( - '#!/bin/bash\n' + BuilderUtils.LICENSE + - '# Builds the pipeline specs\n' - f'# This script should run from the {TOP_LVL_NAME} directory\n' - '# Change directory in case this is not the script root.\n' - '\n' - 'CONFIG_FILE=configs/defaults.yaml\n' - '\n' - 'python3 -m pipelines.pipeline --config $CONFIG_FILE\n') - build_components = ( - '#!/bin/bash\n' + BuilderUtils.LICENSE + - '# Submits a Cloud Build job that builds and deploys the components\n' - f'# This script should run from the {TOP_LVL_NAME} directory\n' - '# Change directory in case this is not the script root.\n' - '\n' - 'gcloud builds submit .. --config cloudbuild.yaml --timeout=3600\n') - run_pipeline = ( - '#!/bin/bash\n' + BuilderUtils.LICENSE + - '# Submits the PipelineJob to Vertex AI\n' - f'# This script should run from the {TOP_LVL_NAME} directory\n' - '# Change directory in case this is not the script root.\n' - '\n' - 'CONFIG_FILE=configs/defaults.yaml\n' - '\n' - 'python3 -m pipelines.pipeline_runner --config $CONFIG_FILE\n') - run_all = ( - '#!/bin/bash\n' + BuilderUtils.LICENSE + - '# Builds components, pipeline specs, and submits the PipelineJob.\n' - f'# This script should run from the {TOP_LVL_NAME} directory\n' - '# Change directory in case this is not the script root.\n' - '\n' - '''GREEN='\033[0;32m'\n''' - '''NC='\033[0m'\n''' - '\n' - 'echo -e "${GREEN} BUILDING COMPONENTS ${NC}"\n' - 'gcloud builds submit .. --config cloudbuild.yaml --timeout=3600\n' - '\n' - 'echo -e "${GREEN} BUILDING PIPELINE SPEC ${NC}"\n' - './scripts/build_pipeline_spec.sh\n' - '\n' - 'echo -e "${GREEN} RUNNING PIPELINE JOB ${NC}"\n' - './scripts/run_pipeline.sh\n') - BuilderUtils.write_and_chmod(PIPELINE_SPEC_SH_FILE, build_pipeline_spec) - BuilderUtils.write_and_chmod(BUILD_COMPONENTS_SH_FILE, build_components) - BuilderUtils.write_and_chmod(RUN_PIPELINE_SH_FILE, run_pipeline) - BuilderUtils.write_and_chmod(RUN_ALL_SH_FILE, run_all) - _create_resources_scripts(run_local) - -def _create_resources_scripts(run_local: bool): - """Writes create_resources.sh and create_scheduler.sh, which creates a specified - artifact registry and gs bucket if they do not already exist. Also creates - a service account to run Vertex AI Pipelines. Requires a defaults.yaml - config to pull config vars from. - - Args: - run_local: Flag that determines whether to use Cloud Run CI/CD. - """ - defaults = BuilderUtils.read_yaml_file(DEFAULTS_FILE) - left_bracket = '{' - right_bracket = '}' - newline = '\n' - # pylint: disable=anomalous-backslash-in-string - create_resources_script = ( - '#!/bin/bash\n' + BuilderUtils.LICENSE + - f'# This script will create an artifact registry and gs bucket if they do not already exist.\n' - f'\n' - f'''GREEN='\033[0;32m'\n''' - f'''NC='\033[0m'\n''' - f'''AF_REGISTRY_NAME={defaults['gcp']['af_registry_name']}\n''' - f'''AF_REGISTRY_LOCATION={defaults['gcp']['af_registry_location']}\n''' - f'''PROJECT_ID={defaults['gcp']['project_id']}\n''' - f'''PROJECT_NUMBER=`gcloud projects describe {defaults['gcp']['project_id']} --format 'value(projectNumber)'`\n''' - f'''BUCKET_NAME={defaults['gcp']['gs_bucket_name']}\n''' - f'''BUCKET_LOCATION={defaults['pipelines']['pipeline_region']}\n''' - f'''SERVICE_ACCOUNT_NAME={defaults['gcp']['pipeline_runner_service_account'].split('@')[0]}\n''' - f'''SERVICE_ACCOUNT_FULL={defaults['gcp']['pipeline_runner_service_account']}\n''' - f'''CLOUD_SOURCE_REPO={defaults['gcp']['cloud_source_repository']}\n''' - f'''CLOUD_SOURCE_REPO_BRANCH={defaults['gcp']['cloud_source_repository_branch']}\n''' - f'''CB_TRIGGER_LOCATION={defaults['gcp']['cb_trigger_location']}\n''' - f'''CB_TRIGGER_NAME={defaults['gcp']['cb_trigger_name']}\n''' - f'''CLOUD_TASKS_QUEUE_LOCATION={defaults['gcp']['cloud_tasks_queue_location']}\n''' - f'''CLOUD_TASKS_QUEUE_NAME={defaults['gcp']['cloud_tasks_queue_name']}\n''' - f'\n' - f'echo -e "$GREEN Updating required API services in project $PROJECT_ID $NC"\n' - f'gcloud services enable cloudresourcemanager.googleapis.com \{newline}' - f' aiplatform.googleapis.com \{newline}' - f' artifactregistry.googleapis.com \{newline}' - f' cloudbuild.googleapis.com \{newline}' - f' cloudscheduler.googleapis.com \{newline}' - f' cloudtasks.googleapis.com \{newline}' - f' compute.googleapis.com \{newline}' - f' iam.googleapis.com \{newline}' - f' iamcredentials.googleapis.com \{newline}' - f' ml.googleapis.com \{newline}' - f' run.googleapis.com \{newline}' - f' storage.googleapis.com \{newline}' - f' sourcerepo.googleapis.com\n' - f'\n' - f'echo -e "$GREEN Checking for Artifact Registry: $AF_REGISTRY_NAME in project $PROJECT_ID $NC"\n' - f'if ! (gcloud artifacts repositories list --project="$PROJECT_ID" --location=$AF_REGISTRY_LOCATION | grep --fixed-strings "(^|[[:blank:]])$AF_REGISTRY_NAME($|[[:blank:]]))"; then\n' - f'\n' - f' echo "Creating Artifact Registry: ${left_bracket}AF_REGISTRY_NAME{right_bracket} in project $PROJECT_ID"\n' - f' gcloud artifacts repositories create "$AF_REGISTRY_NAME" \{newline}' - f' --repository-format=docker \{newline}' - f' --location=$AF_REGISTRY_LOCATION \{newline}' - f' --project="$PROJECT_ID" \{newline}' - f' --description="Artifact Registry ${left_bracket}AF_REGISTRY_NAME{right_bracket} in ${left_bracket}AF_REGISTRY_LOCATION{right_bracket}." \n' - f'\n' - f'else\n' - f'\n' - f' echo "Artifact Registry: ${left_bracket}AF_REGISTRY_NAME{right_bracket} already exists in project $PROJECT_ID"\n' - f'\n' - f'fi\n' - f'\n' - f'\n' - f'echo -e "$GREEN Checking for GS Bucket: $BUCKET_NAME in project $PROJECT_ID $NC"\n' - f'if !(gsutil ls -b gs://$BUCKET_NAME | grep --fixed-strings "(^|[[:blank:]])$BUCKET_NAME($|[[:blank:]]))"; then\n' - f'\n' - f' echo "Creating GS Bucket: ${left_bracket}BUCKET_NAME{right_bracket} in project $PROJECT_ID"\n' - f' gsutil mb -l ${left_bracket}BUCKET_LOCATION{right_bracket} gs://$BUCKET_NAME\n' - f'\n' - f'else\n' - f'\n' - f' echo "GS Bucket: ${left_bracket}BUCKET_NAME{right_bracket} already exists in project $PROJECT_ID"\n' - f'\n' - f'fi\n' - f'\n' - f'echo -e "$GREEN Checking for Service Account: $SERVICE_ACCOUNT_NAME in project $PROJECT_ID $NC"\n' - f'if ! (gcloud iam service-accounts list --project="$PROJECT_ID" | grep --fixed-strings "(^|[[:blank:]])$SERVICE_ACCOUNT_FULL($|[[:blank:]]))"; then\n' - f'\n' - f' echo "Creating Service Account: ${left_bracket}SERVICE_ACCOUNT_NAME{right_bracket} in project $PROJECT_ID"\n' - f' gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \{newline}' - f' --description="For submitting PipelineJobs" \{newline}' - f' --display-name="Pipeline Runner Service Account"\n' - f'else\n' - f'\n' - f' echo "Service Account: ${left_bracket}SERVICE_ACCOUNT_NAME{right_bracket} already exists in project $PROJECT_ID"\n' - f'\n' - f'fi\n' - f'\n' - f'echo -e "$GREEN Updating required IAM roles in project $PROJECT_ID $NC"\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$SERVICE_ACCOUNT_FULL" \{newline}' - f' --role="roles/aiplatform.user" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$SERVICE_ACCOUNT_FULL" \{newline}' - f' --role="roles/artifactregistry.reader" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$SERVICE_ACCOUNT_FULL" \{newline}' - f' --role="roles/bigquery.user" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$SERVICE_ACCOUNT_FULL" \{newline}' - f' --role="roles/bigquery.dataEditor" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$SERVICE_ACCOUNT_FULL" \{newline}' - f' --role="roles/iam.serviceAccountUser" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$SERVICE_ACCOUNT_FULL" \{newline}' - f' --role="roles/storage.admin" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$SERVICE_ACCOUNT_FULL" \{newline}' - f' --role="roles/run.admin" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com" \{newline}' - f' --role="roles/run.admin" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com" \{newline}' - f' --role="roles/iam.serviceAccountUser" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com" \{newline}' - f' --role="roles/cloudtasks.enqueuer" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'gcloud projects add-iam-policy-binding $PROJECT_ID \{newline}' - f' --member="serviceAccount:$PROJECT_NUMBER@cloudbuild.gserviceaccount.com" \{newline}' - f' --role="roles/cloudscheduler.admin" \{newline}' - f' --no-user-output-enabled\n' - f'\n' - f'echo -e "$GREEN Checking for Cloud Source Repository: $CLOUD_SOURCE_REPO in project $PROJECT_ID $NC"\n' - f'if ! (gcloud source repos list --project="$PROJECT_ID" | grep --fixed-strings "(^|[[:blank:]])$CLOUD_SOURCE_REPO($|[[:blank:]]))"; then\n' - f'\n' - f' echo "Creating Cloud Source Repository: ${left_bracket}CLOUD_SOURCE_REPO{right_bracket} in project $PROJECT_ID"\n' - f' gcloud source repos create $CLOUD_SOURCE_REPO\n' - f'\n' - f'else\n' - f'\n' - f' echo "Cloud Source Repository: ${left_bracket}CLOUD_SOURCE_REPO{right_bracket} already exists in project $PROJECT_ID"\n' - f'\n' - f'fi\n') - if not run_local: - create_resources_script += ( - f'\n' - f'# Create cloud tasks queue\n' - f'echo -e "$GREEN Checking for Cloud Tasks Queue: $CLOUD_TASKS_QUEUE_NAME in project $PROJECT_ID $NC"\n' - f'if ! (gcloud tasks queues list --location $CLOUD_TASKS_QUEUE_LOCATION | grep --fixed-strings "(^|[[:blank:]])$CLOUD_TASKS_QUEUE_NAME($|[[:blank:]]))"; then\n' - f'\n' - f' echo "Creating Cloud Tasks Queue: ${left_bracket}CLOUD_TASKS_QUEUE_NAME{right_bracket} in project $PROJECT_ID"\n' - f' gcloud tasks queues create $CLOUD_TASKS_QUEUE_NAME \{newline}' - f' --location=$CLOUD_TASKS_QUEUE_LOCATION\n' - f'\n' - f'else\n' - f'\n' - f' echo "Cloud Tasks Queue: ${left_bracket}CLOUD_TASKS_QUEUE_NAME{right_bracket} already exists in project $PROJECT_ID"\n' - f'\n' - f'fi\n' - f'\n' - f'# Create cloud build trigger\n' - f'echo -e "$GREEN Checking for Cloudbuild Trigger: $CB_TRIGGER_NAME in project $PROJECT_ID $NC"\n' - f'if ! (gcloud beta builds triggers list --project="$PROJECT_ID" --region="$CB_TRIGGER_LOCATION" | grep --fixed-strings "(^|[[:blank:]])name: $CB_TRIGGER_NAME($|[[:blank:]]))"; then\n' - f'\n' - f' echo "Creating Cloudbuild Trigger on branch $CLOUD_SOURCE_REPO_BRANCH in project $PROJECT_ID for repo ${left_bracket}CLOUD_SOURCE_REPO{right_bracket}"\n' - f' gcloud beta builds triggers create cloud-source-repositories \{newline}' - f' --region=$CB_TRIGGER_LOCATION \{newline}' - f' --name=$CB_TRIGGER_NAME \{newline}' - f' --repo=$CLOUD_SOURCE_REPO \{newline}' - f' --branch-pattern="$CLOUD_SOURCE_REPO_BRANCH" \{newline}' - f' --build-config={TOP_LVL_NAME}cloudbuild.yaml\n' - f'\n' - f'else\n' - f'\n' - f' echo "Cloudbuild Trigger already exists in project $PROJECT_ID for repo ${left_bracket}CLOUD_SOURCE_REPO{right_bracket}"\n' - f'\n' - f'fi\n') - BuilderUtils.write_and_chmod(RESOURCES_SH_FILE, create_resources_script) - -def _create_cloudbuild_config(run_local: bool): - """Writes a cloudbuild.yaml to the base directory. - Requires a defaults.yaml config to pull config vars from. - - Args: - run_local: Flag that determines whether to use Cloud Run CI/CD. - """ - defaults = BuilderUtils.read_yaml_file(DEFAULTS_FILE) - vpc_connector = defaults['gcp']['vpc_connector'] - vpc_connector_tail = '' - if vpc_connector != 'No VPC Specified': - vpc_connector_tail = ( - f'\n' - f' "--ingress", "internal",\n' - f' "--vpc-connector", "{vpc_connector}",\n' - f' "--vpc-egress", "all-traffic"') - vpc_connector_tail += ']\n' - - cloudbuild_comp_config = (BuilderUtils.LICENSE + - f'steps:\n' - f'# ==============================================================================\n' - f'# BUILD & PUSH CUSTOM COMPONENT IMAGES\n' - f'# ==============================================================================\n' - f'\n' - f''' # build the component_base image\n''' - f''' - name: "gcr.io/cloud-builders/docker"\n''' - f''' args: [ "build", "-t", "{defaults['gcp']['af_registry_location']}-docker.pkg.dev/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_name']}/components/component_base:latest", "." ]\n''' - f''' dir: "{TOP_LVL_NAME}components/component_base"\n''' - f''' id: "build_component_base"\n''' - f''' waitFor: ["-"]\n''' - f'\n' - f''' # push the component_base image\n''' - f''' - name: "gcr.io/cloud-builders/docker"\n''' - f''' args: ["push", "{defaults['gcp']['af_registry_location']}-docker.pkg.dev/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_name']}/components/component_base:latest"]\n''' - f''' dir: "{TOP_LVL_NAME}components/component_base"\n''' - f''' id: "push_component_base"\n''' - f''' waitFor: ["build_component_base"]\n''') - cloudbuild_cloudrun_config = ( - f'\n' - f'# ==============================================================================\n' - f'# BUILD & PUSH CLOUD RUN IMAGES\n' - f'# ==============================================================================\n' - f'\n' - f''' # build the run_pipeline image\n''' - f''' - name: 'gcr.io/cloud-builders/docker'\n''' - f''' args: [ "build", "-t", "{defaults['gcp']['af_registry_location']}-docker.pkg.dev/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_name']}/run_pipeline:latest", "-f", "cloud_run/run_pipeline/Dockerfile", "." ]\n''' - f''' dir: "{TOP_LVL_NAME}"\n''' - f''' id: "build_pipeline_runner_svc"\n''' - f''' waitFor: ['push_component_base']\n''' - f'\n' - f''' # push the run_pipeline image\n''' - f''' - name: "gcr.io/cloud-builders/docker"\n''' - f''' args: ["push", "{defaults['gcp']['af_registry_location']}-docker.pkg.dev/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_name']}/run_pipeline:latest"]\n''' - f''' dir: "{TOP_LVL_NAME}"\n''' - f''' id: "push_pipeline_runner_svc"\n''' - f''' waitFor: ["build_pipeline_runner_svc"]\n''' - f'\n' - f''' # deploy the cloud run service\n''' - f''' - name: "gcr.io/google.com/cloudsdktool/cloud-sdk"\n''' - f''' entrypoint: gcloud\n''' - f''' args: ["run",\n''' - f''' "deploy",\n''' - f''' "{defaults['gcp']['cloud_run_name']}",\n''' - f''' "--image",\n''' - f''' "{defaults['gcp']['af_registry_location']}-docker.pkg.dev/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_name']}/run_pipeline:latest",\n''' - f''' "--region",\n''' - f''' "{defaults['gcp']['cloud_run_location']}",\n''' - f''' "--service-account",\n''' - f''' "{defaults['gcp']['pipeline_runner_service_account']}",{vpc_connector_tail}''' - f''' id: "deploy_pipeline_runner_svc"\n''' - f''' waitFor: ["push_pipeline_runner_svc"]\n''' - f'\n' - f''' # Copy runtime parameters\n''' - f''' - name: 'gcr.io/cloud-builders/gcloud'\n''' - f''' entrypoint: bash\n''' - f''' args:\n''' - f''' - '-e'\n''' - f''' - '-c'\n''' - f''' - |\n''' - f''' cp -r {TOP_LVL_NAME}cloud_run/queueing_svc .\n''' - f''' id: "setup_queueing_svc"\n''' - f''' waitFor: ["deploy_pipeline_runner_svc"]\n''' - f'\n' - f''' # Install dependencies\n''' - f''' - name: python\n''' - f''' entrypoint: pip\n''' - f''' args: ["install", "-r", "queueing_svc/requirements.txt", "--user"]\n''' - f''' id: "install_queueing_svc_deps"\n''' - f''' waitFor: ["setup_queueing_svc"]\n''' - f'\n' - f''' # Submit to queue\n''' - f''' - name: python\n''' - f''' entrypoint: python\n''' - f''' args: ["queueing_svc/main.py", "--setting", "queue_job"]\n''' - f''' id: "submit_job_to_queue"\n''' - f''' waitFor: ["install_queueing_svc_deps"]\n''') - cloudbuild_scheduler_config = ( - '\n' - ''' # Create Scheduler Job\n''' - ''' - name: python\n''' - ''' entrypoint: python\n''' - ''' args: ["queueing_svc/main.py", "--setting", "schedule_job"]\n''' - ''' id: "schedule_job"\n''' - ''' waitFor: ["submit_job_to_queue"]\n''') - custom_comp_image = ( - f'\n' - f'images:\n' - f''' # custom component images\n''' - f''' - "{defaults['gcp']['af_registry_location']}-docker.pkg.dev/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_name']}/components/component_base:latest"\n''') - cloudrun_image = ( - f''' # Cloud Run image\n''' - f''' - "{defaults['gcp']['af_registry_location']}-docker.pkg.dev/{defaults['gcp']['project_id']}/{defaults['gcp']['af_registry_name']}/run_pipeline:latest"\n''') - - if run_local: - cb_file_contents = cloudbuild_comp_config + custom_comp_image - else: - if defaults['gcp']['cloud_schedule_pattern'] == 'No Schedule Specified': - cb_file_contents = cloudbuild_comp_config + cloudbuild_cloudrun_config + custom_comp_image + cloudrun_image - else: - cb_file_contents = cloudbuild_comp_config + cloudbuild_cloudrun_config + cloudbuild_scheduler_config + custom_comp_image + cloudrun_image - BuilderUtils.write_file(CLOUDBUILD_FILE, cb_file_contents, 'w+') - -def _autoflake_srcfiles(): - """Removes unused imports from the python srcfiles. By default, - all imports listed in the imports cell will be written to - each srcfile. Autoflake removes the ones not being used.""" - BuilderUtils.execute_process(f'python3 -m autoflake --in-place --remove-all-unused-imports {COMPONENT_BASE_SRC}/*.py', to_null=False) - -def _create_requirements(use_kfp_spec: bool): - """Writes a requirements.txt to the component_base directory. - If not using kfp spec, infers pip requirements from the - python srcfiles using pipreqs. Some default gcp packages - are included, as well as packages that are often missing - in setup.py files (e.g db_types, pyarrow, gcsfs, fsspec). - - Args: - use_kfp_spec: Flag that determines the format of the component yamls. - """ - reqs_filename = f'{COMPONENT_BASE}/requirements.txt' - if use_kfp_spec: - BuilderUtils.delete_file(reqs_filename) - components_path_list = BuilderUtils.get_components_list() - for component_path in components_path_list: - component_spec = BuilderUtils.read_yaml_file(component_path) - reqs = component_spec['implementation']['container']['command'][2] - formatted_reqs = re.findall('\'([^\']*)\'', reqs) - reqs_str = ''.join(r+'\n' for r in formatted_reqs) - BuilderUtils.write_file(reqs_filename, reqs_str, 'a+') - else: - gcp_reqs = ( - 'google-cloud-aiplatform\n' - 'google-cloud-appengine-logging\n' - 'google-cloud-audit-log\n' - 'google-cloud-bigquery\n' - 'google-cloud-bigquery-storage\n' - 'google-cloud-bigtable\n' - 'google-cloud-core\n' - 'google-cloud-dataproc\n' - 'google-cloud-datastore\n' - 'google-cloud-dlp\n' - 'google-cloud-firestore\n' - 'google-cloud-kms\n' - 'google-cloud-language\n' - 'google-cloud-logging\n' - 'google-cloud-monitoring\n' - 'google-cloud-notebooks\n' - 'google-cloud-pipeline-components\n' - 'google-cloud-pubsub\n' - 'google-cloud-pubsublite\n' - 'google-cloud-recommendations-ai\n' - 'google-cloud-resource-manager\n' - 'google-cloud-scheduler\n' - 'google-cloud-spanner\n' - 'google-cloud-speech\n' - 'google-cloud-storage\n' - 'google-cloud-tasks\n' - 'google-cloud-translate\n' - 'google-cloud-videointelligence\n' - 'google-cloud-vision\n' - 'db_dtypes\n' - 'pyarrow\n' - 'gcsfs\n' - 'fsspec\n') - BuilderUtils.execute_process(f'python3 -m pipreqs.pipreqs {COMPONENT_BASE} --mode no-pin --force', to_null=False) - BuilderUtils.write_file(reqs_filename, gcp_reqs, 'a') - -def _create_dockerfile(): - """Writes a Dockerfile to the component_base directory.""" - # pylint: disable=anomalous-backslash-in-string - dockerfile = (BuilderUtils.LICENSE + - f'FROM {DEFAULT_IMAGE}\n' - f'RUN python -m pip install --upgrade pip\n' - f'COPY requirements.txt .\n' - f'RUN python -m pip install -r \ \n' - f' requirements.txt --quiet --no-cache-dir \ \n' - f' && rm -f requirements.txt\n' - f'COPY ./src /pipelines/component/src\n' - f'ENTRYPOINT ["/bin/bash"]\n') - BuilderUtils.write_file(f'{COMPONENT_BASE}/Dockerfile', dockerfile, 'w') - -def makeComponent(name: str, - params: list, - description: str = None): - """Wrapper function that creates a tmp component scaffold - which will be used by the ComponentBuilder formalize function. - - Args: - name: Component name. - params: Component parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - description: Optional description of the component. - """ - ComponentBuilder.create_component_scaffold(name, - params, description) - -def makePipeline(name: str, - params: list, - pipeline: list, - description: str = None): - """Wrapper function that creates a tmp pipeline scaffold - which will be used by the PipelineBuilder formalize function. - - Args: - name: Pipeline name. - params: Pipeline parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - pipeline: Defines the components to use in the pipeline, - their order, and a mapping of component params to - pipeline params. A list of dictionaries, each dict - specifies a custom component and contains keys: - 'component_name': name of the component - 'param_mapping': a list of tuples mapping -> - (component_param_name, pipeline_param_name) - description: Optional description of the pipeline. - """ - PipelineBuilder.create_pipeline_scaffold(name, - params, pipeline, description) diff --git a/build/lib/AutoMLOps/AutoMLOps_test.py b/build/lib/AutoMLOps/AutoMLOps_test.py deleted file mode 100644 index a77612d..0000000 --- a/build/lib/AutoMLOps/AutoMLOps_test.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Placeholder - tests""" - -# pylint: disable=C0103 - -def test_go(): - """Placeholder""" - assert True diff --git a/build/lib/AutoMLOps/BuilderUtils.py b/build/lib/AutoMLOps/BuilderUtils.py deleted file mode 100644 index 7b874d4..0000000 --- a/build/lib/AutoMLOps/BuilderUtils.py +++ /dev/null @@ -1,333 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Utility functions and globals to be used by all - other modules in this directory.""" - -# pylint: disable=C0103 -# pylint: disable=line-too-long - -import os -import subprocess -import yaml - -TMPFILES_DIR = '.tmpfiles' -IMPORTS_TMPFILE = f'{TMPFILES_DIR}/imports.py' -CELL_TMPFILE = f'{TMPFILES_DIR}/cell.py' -PIPELINE_TMPFILE = f'{TMPFILES_DIR}/pipeline_scaffold.py' -PARAMETER_VALUES_PATH = 'pipelines/runtime_parameters/pipeline_parameter_values.json' -PIPELINE_JOB_SPEC_PATH = 'scripts/pipeline_spec/pipeline_job.json' -LICENSE = ( - '# Licensed under the Apache License, Version 2.0 (the "License");\n' - '# you may not use this file except in compliance with the License.\n' - '# You may obtain a copy of the License at\n' - '#\n' - '# http://www.apache.org/licenses/LICENSE-2.0\n' - '#\n' - '# Unless required by applicable law or agreed to in writing, software\n' - '# distributed under the License is distributed on an "AS IS" BASIS,\n' - '# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n' - '# See the License for the specific language governing permissions and\n' - '# limitations under the License.\n' - '#\n' - '# DISCLAIMER: This code is generated as part of the AutoMLOps output.\n' - '\n') - -def make_dirs(directories: list): - """Makes directories with the specified names. - - Args: - directories: Path of the directories to make. - """ - for d in directories: - try: - os.makedirs(d) - except FileExistsError: - pass - -def read_yaml_file(filepath: str) -> dict: - """Reads a yaml and returns file contents as a dict. - Defaults to utf-8 encoding. - - Args: - filepath: Path to the yaml. - Returns: - dict: Contents of the yaml. - Raises: - Exception: If an error is encountered reading the file. - """ - try: - with open(filepath, 'r', encoding='utf-8') as file: - file_dict = yaml.safe_load(file) - file.close() - except yaml.YAMLError as err: - raise yaml.YAMLError(f'Error reading file. {err}') from err - return file_dict - -def write_yaml_file(filepath: str, contents: dict, mode: str): - """Writes a dictionary to yaml. Defaults to utf-8 encoding. - - Args: - filepath: Path to the file. - contents: Dictionary to be written to yaml. - mode: Read/write mode to be used. - Raises: - Exception: If an error is encountered writing the file. - """ - try: - with open(filepath, mode, encoding='utf-8') as file: - yaml.safe_dump(contents, file, sort_keys=False) - file.close() - except yaml.YAMLError as err: - raise yaml.YAMLError(f'Error writing to file. {err}') from err - -def read_file(filepath: str) -> str: - """Reads a file and returns contents as a string. - Defaults to utf-8 encoding. - - Args: - filepath: Path to the yaml. - Returns: - dict: Contents of the yaml. - Raises: - Exception: If an error is encountered reading the file. - """ - try: - with open(filepath, 'r', encoding='utf-8') as file: - contents = file.read() - file.close() - except FileNotFoundError as err: - raise FileNotFoundError(f'Error reading file. {err}') from err - return contents - -def write_file(filepath: str, text: str, mode: str): - """Writes a file at the specified path. Defaults to utf-8 encoding. - - Args: - filepath: Path to the file. - text: Text to be written to file. - mode: Read/write mode to be used. - Raises: - Exception: If an error is encountered writing the file. - """ - try: - with open(filepath, mode, encoding='utf-8') as file: - file.write(text) - file.close() - except OSError as err: - raise OSError(f'Error writing to file. {err}') from err - -def write_and_chmod(filepath: str, text: str): - """Writes a file at the specified path and chmods the file - to allow for execution. - - Args: - filepath: Path to the file. - text: Text to be written to file. - Raises: - Exception: If an error is encountered chmod-ing the file. - """ - write_file(filepath, text, 'w+') - try: - st = os.stat(filepath) - os.chmod(filepath, st.st_mode | 0o111) - except OSError as err: - raise OSError(f'Error chmod-ing file. {err}') from err - -def delete_file(filepath: str): - """Deletes a file at the specified path. - If it does not exist, pass. - - Args: - filepath: Path to the file. - """ - try: - os.remove(filepath) - except OSError: - pass - -def get_components_list(full_path: bool = True) -> list: - """Reads yamls in tmpfiles directory, verifies they are component - yamls, and returns the name of the files. - - Args: - full_path: Boolean; if false, stores only the filename w/o extension. - Returns: - list: Contains the names or paths of all component yamls in the dir. - """ - components_list = [] - elements = os.listdir(TMPFILES_DIR) - for file in list(filter(lambda y: ('.yaml' or '.yml') in y, elements)): - path = os.path.join(TMPFILES_DIR, file) - if is_component_config(path): - if full_path: - components_list.append(path) - else: - components_list.append(os.path.basename(file).split('.')[0]) - return components_list - -def is_component_config(filepath: str) -> bool: - """Checks to see if the given file is a component yaml. - - Args: - filepath: Path to a yaml file. - Returns: - bool: Whether the given file is a component yaml. - """ - required_keys = ['name','inputs','implementation'] - file_dict = read_yaml_file(filepath) - return all(key in file_dict.keys() for key in required_keys) - -def execute_process(command: str, to_null: bool): - """Executes an external shell process. - - Args: - command: The string of the command to execute. - to_null: Determines where to send output. - Raises: - Exception: If an error occurs in executing the script. - """ - stdout = subprocess.DEVNULL if to_null else None - try: - subprocess.run([command], shell=True, check=True, - stdout=stdout, - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as err: - raise RuntimeError(f'Error executing process. {err}') from err - -def validate_schedule(schedule_pattern: str, run_local: str): - """Validates that the inputted schedule parameter. - - Args: - schedule_pattern: Cron formatted value used to create a Scheduled retrain job. - run_local: Flag that determines whether to use Cloud Run CI/CD. - Raises: - Exception: If schedule is not cron formatted or run_local validation fails. - """ - if schedule_pattern != 'No Schedule Specified' and run_local: - raise ValueError('run_local must be set to False to use Cloud Scheduler.') - -def validate_name(name: str): - """Validates that the inputted name parameter is of type str. - - Args: - name: The name of a component or pipeline. - Raises: - Exception: If the name is not of type str. - """ - if not isinstance(name, str): - raise TypeError('Pipeline and Component names must be of type string.') - -def validate_params(params: list): - """Verifies that the inputted params follow the correct - specification. - - Args: - params: Pipeline parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - Raises: - Exception: If incorrect params specification. - """ - s = set() - for param in params: - try: - name = param['name'] - if not isinstance(name, str): - raise TypeError('Parameter name must be of type string.') - param_type = param['type'] - if not isinstance(param_type, type): - raise TypeError('Parameter type must be a valid python type.') - except KeyError as err: - raise ValueError(f'Parameter {param} does not contain ' - f'required keys. {err}') from err - if param['name'] in s: - raise ValueError(f'''Duplicate parameter {param['name']} found.''') - else: - s.add(param['name']) - if 'description' not in param.keys(): - param['description'] = 'No description provided.' - -def validate_pipeline_structure(pipeline: list): - """Verifies that the pipeline follows the correct - specification. - - Args: - pipeline: Defines the components to use in the pipeline, - their order, and a mapping of component params to - pipeline params. A list of dictionaries, each dict - specifies a custom component and contains keys: - 'component_name': name of the component - 'param_mapping': a list of tuples mapping -> - (component_param, pipeline_param) - Raises: - Exception: If incorrect pipeline specification. - """ - components_list = get_components_list(full_path=False) - for component in pipeline: - try: - component_name = component['component_name'] - if component_name not in components_list: - raise ValueError(f'Component {component_name} not found - ' - f'No matching yaml definition in tmpfiles directory.') - param_mapping = component['param_mapping'] - except KeyError as err: - raise ValueError(f'Component {component} does not contain ' - f'required keys. {err}') from err - for param_tuple in param_mapping: - if not isinstance(param_tuple, tuple): - raise TypeError(f'Mapping contains a non-tuple ' - f'element {param_tuple}') - elif len(param_tuple) != 2: - raise TypeError(f'Mapping must contain only 2 elements, ' - f'tuple {param_tuple} is invalid.') - else: - for item in param_tuple: - if not isinstance(item, str): - raise TypeError(f'Mapping must be str-str, ' - f'tuple {param_tuple} is invalid.') - -def update_params(params: list) -> list: - """Converts the parameter types from Python types - to Kubeflow types. Currently only supports - Python primitive types. - - Args: - params: Pipeline parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - Returns: - list: Params list with converted types. - Raises: - Exception: If an inputted type is not a primitive. - """ - python_kfp_types_mapper = { - int: 'Integer', - str: 'String', - float: 'Float', - bool: 'Bool', - list: 'List', - dict: 'Dict' - } - for param in params: - try: - param['type'] = python_kfp_types_mapper[param['type']] - except KeyError as err: - raise ValueError(f'Unsupported python type - we only support ' - f'primitive types at this time. {err}') from err - return params diff --git a/build/lib/AutoMLOps/CloudRunBuilder.py b/build/lib/AutoMLOps/CloudRunBuilder.py deleted file mode 100644 index 42ce899..0000000 --- a/build/lib/AutoMLOps/CloudRunBuilder.py +++ /dev/null @@ -1,391 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Builds cloud_run files.""" - -# pylint: disable=C0103 -# pylint: disable=line-too-long - -from AutoMLOps import BuilderUtils - -def formalize(top_lvl_name: str, - defaults_file: str,): - """Constructs and writes a Dockerfile, requirements.txt, and - main.py to the cloud_run/run_pipeline directory. Also - constructs and writes a main.py, requirements.txt, and - pipeline_parameter_values.json to the - cloud_run/queueing_svc directory. - - Args: - top_lvl_name: Top directory name. - defaults_file: Path to the default config variables yaml. - """ - BuilderUtils.make_dirs([top_lvl_name + 'cloud_run', - top_lvl_name + 'cloud_run/run_pipeline', - top_lvl_name + 'cloud_run/queueing_svc']) - create_dockerfile(top_lvl_name) - create_requirements(top_lvl_name) - create_mains(top_lvl_name, defaults_file) - # copy runtime parameters over to queueing_svc dir - BuilderUtils.execute_process(f'''cp -r {top_lvl_name + BuilderUtils.PARAMETER_VALUES_PATH} {top_lvl_name + 'cloud_run/queueing_svc'}''', to_null=False) - -def create_dockerfile(top_lvl_name: str): - """Writes a Dockerfile to the cloud_run/run_pipeline directory. - - Args: - top_lvl_name: Top directory name. - """ - cloudrun_base = top_lvl_name + 'cloud_run/run_pipeline' - dockerfile = (BuilderUtils.LICENSE + - 'FROM python:3.9\n' - '\n' - '# Allow statements and log messages to immediately appear in the Knative logs\n' - 'ENV PYTHONUNBUFFERED True\n' - '\n' - '# Copy local code to the container image.\n' - 'ENV APP_HOME /app\n' - 'WORKDIR $APP_HOME\n' - 'COPY ./ ./\n' - '\n' - '# Upgrade pip\n' - 'RUN python -m pip install --upgrade pip\n' - '# Install requirements\n' - 'RUN pip install --no-cache-dir -r /app/cloud_run/run_pipeline/requirements.txt\n' - '# Compile pipeline spec\n' - 'RUN ./scripts/build_pipeline_spec.sh\n' - '# Change Directories\n' - 'WORKDIR "/app/cloud_run/run_pipeline"\n' - '# Run flask api server\n' - 'CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app\n' - ) - BuilderUtils.write_file(f'{cloudrun_base}/Dockerfile', dockerfile, 'w') - -def create_requirements(top_lvl_name: str): - """Writes a requirements.txt to the cloud_run/run_pipeline - directory, and a requirements.txt to the cloud_run/queueing_svc - directory. - - Args: - top_lvl_name: Top directory name. - """ - cloudrun_base = top_lvl_name + 'cloud_run/run_pipeline' - queueing_svc_base = top_lvl_name + 'cloud_run/queueing_svc' - cloud_run_reqs = ( - 'kfp\n' - 'google-cloud-aiplatform\n' - 'google-cloud-pipeline-components\n' - 'Flask\n' - 'gunicorn\n' - 'pyyaml\n' - ) - queueing_svc_reqs = ( - 'google-cloud\n' - 'google-cloud-tasks\n' - 'google-api-python-client\n' - 'google-cloud-run\n' - 'google-cloud-scheduler\n' - ) - BuilderUtils.write_file(f'{cloudrun_base}/requirements.txt', cloud_run_reqs, 'w') - BuilderUtils.write_file(f'{queueing_svc_base}/requirements.txt', queueing_svc_reqs, 'w') - -def create_mains(top_lvl_name: str, - defaults_file: str): - """Writes a main.py to the cloud_run/run_pipeline - directory. This file contains code for running - a flask service that will act as a pipeline - runner service. Also writes a main.py to - the cloud_run/queueing_svc directory. This file - contains code for submitting a job to the cloud - runner service, and creating a cloud scheduler job. - - Args: - top_lvl_name: Top directory name. - defaults_file: Path to the default config variables yaml. - """ - defaults = BuilderUtils.read_yaml_file(defaults_file) - cloudrun_base = top_lvl_name + 'cloud_run/run_pipeline' - queueing_svc_base = top_lvl_name + 'cloud_run/queueing_svc' - left_bracket = '{' - right_bracket = '}' - cloud_run_code = (BuilderUtils.LICENSE + - f'''"""Cloud Run to run pipeline spec"""\n''' - f'''import logging\n''' - f'''import os\n''' - f'''from typing import Tuple\n''' - f'\n' - f'''import flask\n''' - f'''from google.cloud import aiplatform\n''' - f'''import yaml\n''' - f'\n' - f'''app = flask.Flask(__name__)\n''' - f'\n' - f'''logger = logging.getLogger()\n''' - f'''log_level = os.environ.get('LOG_LEVEL', 'INFO')\n''' - f'''logger.setLevel(log_level)\n''' - f'\n' - f'''CONFIG_FILE = '../../configs/defaults.yaml'\n''' - f'''PIPELINE_SPEC_PATH_LOCAL = '../../scripts/pipeline_spec/pipeline_job.json'\n''' - f'\n' - f'''@app.route('/', methods=['POST'])\n''' - f'''def process_request() -> flask.Response:\n''' - f''' """HTTP web service to trigger pipeline execution.\n''' - f'\n' - f''' Returns:\n''' - f''' The response text, or any set of values that can be turned into a\n''' - f''' Response object using `make_response`\n''' - f''' .\n''' - f''' """\n''' - f''' content_type = flask.request.headers['content-type']\n''' - f''' if content_type == 'application/json':\n''' - f''' request_json = flask.request.json\n''' - f'\n' - f''' logging.debug('JSON Recieved:')\n''' - f''' logging.debug(request_json)\n''' - f'\n' - f''' with open(CONFIG_FILE, 'r', encoding='utf-8') as config_file:\n''' - f''' config = yaml.load(config_file, Loader=yaml.FullLoader)\n''' - f'\n' - f''' logging.debug('Calling run_pipeline()')\n''' - f''' dashboard_uri, resource_name = run_pipeline(\n''' - f''' project_id=config['gcp']['project_id'],\n''' - f''' pipeline_root=config['pipelines']['pipeline_storage_path'],\n''' - f''' pipeline_runner_sa=config['gcp']['pipeline_runner_service_account'],\n''' - f''' pipeline_params=request_json,\n''' - f''' pipeline_spec_path=PIPELINE_SPEC_PATH_LOCAL)\n''' - f''' return flask.make_response({left_bracket}\n''' - f''' 'dashboard_uri': dashboard_uri,\n''' - f''' 'resource_name': resource_name\n''' - f''' {right_bracket}, 200)\n''' - f'\n' - f''' else:\n''' - f''' raise ValueError(f'Unknown content type: {left_bracket}content_type{right_bracket}')\n''' - f'\n' - f'''def run_pipeline(\n''' - f''' project_id: str,\n''' - f''' pipeline_root: str,\n''' - f''' pipeline_runner_sa: str,\n''' - f''' pipeline_params: dict,\n''' - f''' pipeline_spec_path: str,\n''' - f''' display_name: str = 'mlops-pipeline-run',\n''' - f''' enable_caching: bool = False) -> Tuple[str, str]:\n''' - f''' """Executes a pipeline run.\n''' - f'\n' - f''' Args:\n''' - f''' project_id: The project_id.\n''' - f''' pipeline_root: GCS location of the pipeline runs metadata.\n''' - f''' pipeline_runner_sa: Service Account to runner PipelineJobs.\n''' - f''' pipeline_params: Pipeline parameters values.\n''' - f''' pipeline_spec_path: Location of the pipeline spec JSON.\n''' - f''' display_name: Name to call the pipeline.\n''' - f''' enable_caching: Should caching be enabled (Boolean)\n''' - f''' """\n''' - f''' logging.debug('Pipeline Parms Configured:')\n''' - f''' logging.debug(pipeline_params)\n''' - f'\n' - f''' aiplatform.init(project=project_id)\n''' - f''' job = aiplatform.PipelineJob(\n''' - f''' display_name = display_name,\n''' - f''' template_path = pipeline_spec_path,\n''' - f''' pipeline_root = pipeline_root,\n''' - f''' parameter_values = pipeline_params,\n''' - f''' enable_caching = enable_caching)\n''' - f''' logging.debug('AI Platform job built. Submitting...')\n''' - f''' job.submit(service_account=pipeline_runner_sa)\n''' - f''' logging.debug('Job sent!')\n''' - f''' dashboard_uri = job._dashboard_uri()\n''' - f''' resource_name = job.resource_name\n''' - f''' return dashboard_uri, resource_name\n''' - f'\n' - f'''if __name__ == '__main__':\n''' - f''' app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))\n''') - - queueing_svc_code = (BuilderUtils.LICENSE + - f'''"""Submit pipeline job using Cloud Tasks and create Cloud Scheduler Job."""\n''' - f'''import argparse\n''' - f'''import json\n''' - f'\n' - f'''from google.cloud import run_v2\n''' - f'''from google.cloud import scheduler_v1\n''' - f'''from google.cloud import tasks_v2\n''' - f'\n' - f'''CLOUD_RUN_LOCATION = '{defaults['gcp']['cloud_run_location']}'\n''' - f'''CLOUD_RUN_NAME = '{defaults['gcp']['cloud_run_name']}'\n''' - f'''CLOUD_TASKS_QUEUE_LOCATION = '{defaults['gcp']['cloud_tasks_queue_location']}'\n''' - f'''CLOUD_TASKS_QUEUE_NAME = '{defaults['gcp']['cloud_tasks_queue_name']}'\n''' - f'''PARAMETER_VALUES_PATH = 'queueing_svc/pipeline_parameter_values.json'\n''' - f'''PIPELINE_RUNNER_SA = '{defaults['gcp']['pipeline_runner_service_account']}'\n''' - f'''PROJECT_ID = '{defaults['gcp']['project_id']}'\n''' - f'''SCHEDULE_LOCATION = '{defaults['gcp']['cloud_schedule_location']}'\n''' - f'''SCHEDULE_NAME = '{defaults['gcp']['cloud_schedule_name']}'\n''' - f'''SCHEDULE_PATTERN = '{defaults['gcp']['cloud_schedule_pattern']}'\n''' - f'\n' - f'''def get_runner_svc_uri(\n''' - f''' cloud_run_location: str,\n''' - f''' cloud_run_name: str,\n''' - f''' project_id: str):\n''' - f''' """Fetches the uri for the given cloud run instance.\n''' - f'\n' - f''' Args:\n''' - f''' cloud_run_location: The location of the cloud runner service.\n''' - f''' cloud_run_name: The name of the cloud runner service.\n''' - f''' project_id: The project ID.\n''' - f''' Returns:\n''' - f''' str: Uri of the Cloud Run instance.\n''' - f''' """\n''' - f''' client = run_v2.ServicesClient()\n''' - f''' parent = client.service_path(project_id, cloud_run_location, cloud_run_name)\n''' - f''' request = run_v2.GetServiceRequest(name=parent)\n''' - f''' response = client.get_service(request=request)\n''' - f''' return response.uri\n''' - f'\n' - f'''def get_json_bytes(file_path: str):\n''' - f''' """Reads a json file at the specified path and returns as bytes.\n''' - f'\n' - f''' Args:\n''' - f''' file_path: Path of the json file.\n''' - f''' Returns:\n''' - f''' bytes: Encode bytes of the file.\n''' - f''' """\n''' - f''' try:\n''' - f''' with open(file_path, 'r', encoding='utf-8') as file:\n''' - f''' data = json.load(file)\n''' - f''' file.close()\n''' - f''' except OSError as err:\n''' - f''' raise Exception(f'Error reading json file. {left_bracket}err{right_bracket}') from err\n''' - f''' return json.dumps(data).encode()\n''' - f'\n' - f'''def create_cloud_task(\n''' - f''' cloud_tasks_queue_location: str,\n''' - f''' cloud_tasks_queue_name: str,\n''' - f''' parameter_values_path: str,\n''' - f''' pipeline_runner_sa: str,\n''' - f''' project_id: str,\n''' - f''' runner_svc_uri: str):\n''' - f''' """Create a task to the queue with the runtime parameters.\n''' - f'\n' - f''' Args:\n''' - f''' cloud_run_location: The location of the cloud runner service.\n''' - f''' cloud_run_name: The name of the cloud runner service.\n''' - f''' cloud_tasks_queue_location: The location of the cloud tasks queue.\n''' - f''' cloud_tasks_queue_name: The name of the cloud tasks queue.\n''' - f''' parameter_values_path: Path to json pipeline params.\n''' - f''' pipeline_runner_sa: Service Account to runner PipelineJobs.\n''' - f''' project_id: The project ID.\n''' - f''' runner_svc_uri: Uri of the Cloud Run instance.\n''' - f''' """\n''' - f''' client = tasks_v2.CloudTasksClient()\n''' - f''' parent = client.queue_path(project_id, cloud_tasks_queue_location, cloud_tasks_queue_name)\n''' - f''' task = {left_bracket}\n''' - f''' 'http_request': {left_bracket}\n''' - f''' 'http_method': tasks_v2.HttpMethod.POST,\n''' - f''' 'url': runner_svc_uri,\n''' - f''' 'oidc_token': {left_bracket}\n''' - f''' 'service_account_email': pipeline_runner_sa,\n''' - f''' 'audience': runner_svc_uri\n''' - f''' {right_bracket},\n''' - f''' 'headers': {left_bracket}\n''' - f''' 'Content-Type': 'application/json'\n''' - f''' {right_bracket}\n''' - f''' {right_bracket}\n''' - f''' {right_bracket}\n''' - f''' task['http_request']['body'] = get_json_bytes(parameter_values_path)\n''' - f''' response = client.create_task(request={left_bracket}'parent': parent, 'task': task{right_bracket})\n''' - f''' print(f'Created task {left_bracket}response.name{right_bracket}')\n''' - f'\n' - f'''def create_cloud_scheduler_job(\n''' - f''' parameter_values_path: str,\n''' - f''' pipeline_runner_sa: str,\n''' - f''' project_id: str,\n''' - f''' runner_svc_uri: str,\n''' - f''' schedule_location: str,\n''' - f''' schedule_name: str,\n''' - f''' schedule_pattern: str):\n''' - f''' """Creates a scheduled pipeline job.\n''' - f'\n' - f''' Args:\n''' - f''' parameter_values_path: Path to json pipeline params.\n''' - f''' pipeline_runner_sa: Service Account to runner PipelineJobs.\n''' - f''' project_id: The project ID.\n''' - f''' runner_svc_uri: Uri of the Cloud Run instance.\n''' - f''' schedule_location: The location of the scheduler resource.\n''' - f''' schedule_name: The name of the scheduler resource.\n''' - f''' schedule_pattern: Cron formatted value used to create a Scheduled retrain job.\n''' - f''' """\n''' - f''' client = scheduler_v1.CloudSchedulerClient()\n''' - f''' parent = f'projects/{left_bracket}project_id{right_bracket}/locations/{left_bracket}schedule_location{right_bracket}'\n''' - f''' name = f'{left_bracket}parent{right_bracket}/jobs/{left_bracket}schedule_name{right_bracket}'\n''' - f'\n' - f''' request = scheduler_v1.ListJobsRequest(parent=parent)\n''' - f''' page_result = client.list_jobs(request=request)\n''' - f''' for response in page_result:\n''' - f''' if response.name == name:\n''' - f''' print(f'Cloud Scheduler {left_bracket}schedule_name{right_bracket} resource already exists in '\n''' - f''' f'project {left_bracket}project_id{right_bracket}.')\n''' - f''' return\n''' - f'\n' - f''' oidc_token = scheduler_v1.OidcToken(\n''' - f''' service_account_email=pipeline_runner_sa,\n''' - f''' audience=runner_svc_uri)\n''' - f'\n' - f''' target = scheduler_v1.HttpTarget(\n''' - f''' uri=runner_svc_uri,\n''' - f''' http_method=scheduler_v1.HttpMethod(1), # HTTP POST\n''' - f''' headers={left_bracket}'Content-Type': 'application/json'{right_bracket},\n''' - f''' body=get_json_bytes(parameter_values_path),\n''' - f''' oidc_token=oidc_token)\n''' - f'\n' - f''' job = scheduler_v1.Job(\n''' - f''' name=f'{left_bracket}parent{right_bracket}/jobs/{left_bracket}schedule_name{right_bracket}',\n''' - f''' description='AutoMLOps cloud scheduled run.',\n''' - f''' http_target=target,\n''' - f''' schedule=schedule_pattern)\n''' - f'\n' - f''' request = scheduler_v1.CreateJobRequest(\n''' - f''' parent=parent,\n''' - f''' job=job)\n''' - f'\n' - f''' response = client.create_job(request=request)\n''' - f''' print(response)\n''' - f'\n' - f'''if __name__ == '__main__':\n''' - f''' parser = argparse.ArgumentParser()\n''' - f''' parser.add_argument('--setting', type=str,\n''' - f''' help='The config file for setting default values.')\n''' - f''' args = parser.parse_args()\n''' - f'\n' - f''' uri = get_runner_svc_uri(\n''' - f''' cloud_run_location=CLOUD_RUN_LOCATION,\n''' - f''' cloud_run_name=CLOUD_RUN_NAME,\n''' - f''' project_id=PROJECT_ID)\n''' - f'\n' - f''' if args.setting == 'queue_job':\n''' - f''' create_cloud_task(\n''' - f''' cloud_tasks_queue_location=CLOUD_TASKS_QUEUE_LOCATION,\n''' - f''' cloud_tasks_queue_name=CLOUD_TASKS_QUEUE_NAME,\n''' - f''' parameter_values_path=PARAMETER_VALUES_PATH,\n''' - f''' pipeline_runner_sa=PIPELINE_RUNNER_SA,\n''' - f''' project_id=PROJECT_ID,\n''' - f''' runner_svc_uri=uri)\n''' - f'\n' - f''' if args.setting == 'schedule_job':\n''' - f''' create_cloud_scheduler_job(\n''' - f''' parameter_values_path=PARAMETER_VALUES_PATH,\n''' - f''' pipeline_runner_sa=PIPELINE_RUNNER_SA,\n''' - f''' project_id=PROJECT_ID,\n''' - f''' runner_svc_uri=uri,\n''' - f''' schedule_location=SCHEDULE_LOCATION,\n''' - f''' schedule_name=SCHEDULE_NAME,\n''' - f''' schedule_pattern=SCHEDULE_PATTERN)\n''') - BuilderUtils.write_file(f'{cloudrun_base}/main.py', cloud_run_code, 'w') - BuilderUtils.write_file(f'{queueing_svc_base}/main.py', queueing_svc_code, 'w') diff --git a/build/lib/AutoMLOps/ComponentBuilder.py b/build/lib/AutoMLOps/ComponentBuilder.py deleted file mode 100644 index 2a52886..0000000 --- a/build/lib/AutoMLOps/ComponentBuilder.py +++ /dev/null @@ -1,199 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Builds component files.""" - -# pylint: disable=C0103 -# pylint: disable=line-too-long - -from AutoMLOps import BuilderUtils - -def formalize(component_path: str, - top_lvl_name: str, - defaults_file: str, - use_kfp_spec: bool): - """Constructs and writes component.yaml and {component_name}.py files. - component.yaml: Contains the Kubeflow custom component definition. - {component_name}.py: Contains the python code from the Jupyter cell. - - Args: - component_path: Path to the temporary component yaml. This file - is used to create the permanent component.yaml, and deleted - after calling AutoMLOps.generate(). - top_lvl_name: Top directory name. - defaults_file: Path to the default config variables yaml. - use_kfp_spec: Flag that determines the format of the component yamls. - """ - component_spec = BuilderUtils.read_yaml_file(component_path) - if use_kfp_spec: - component_spec['name'] = component_spec['name'].replace(' ', '_').lower() - component_dir = top_lvl_name + 'components/' + component_spec['name'] - task_filepath = (top_lvl_name + 'components/component_base/src/' + - component_spec['name'] + '.py') - BuilderUtils.make_dirs([component_dir]) - create_task(component_spec, task_filepath, use_kfp_spec) - create_component(component_spec, component_dir, defaults_file) - -def create_task(component_spec: dict, task_filepath: str, use_kfp_spec: bool): - """Writes cell python code to a file with required imports. - - Args: - component_spec: Component definition dictionary. - Contains cell code which is temporarily stored in - component_spec['implementation']['container']['command'] - task_filepath: Path to the file to be written. - use_kfp_spec: Flag that determines the format of the component yamls. - Raises: - Exception: If the imports tmpfile does not exist. - """ - if use_kfp_spec: - custom_imports = '' - custom_code = component_spec['implementation']['container']['command'][-1] - else: - custom_imports = BuilderUtils.read_file(BuilderUtils.IMPORTS_TMPFILE) - custom_code = component_spec['implementation']['container']['command'] - default_imports = (BuilderUtils.LICENSE + - 'import argparse\n' - 'import json\n' - 'from kfp.v2.components import executor\n') - main_func = ( - '\n' - '''def main():\n''' - ''' """Main executor."""\n''' - ''' parser = argparse.ArgumentParser()\n''' - ''' parser.add_argument('--executor_input', type=str)\n''' - ''' parser.add_argument('--function_to_execute', type=str)\n''' - '\n' - ''' args, _ = parser.parse_known_args()\n''' - ''' executor_input = json.loads(args.executor_input)\n''' - ''' function_to_execute = globals()[args.function_to_execute]\n''' - '\n' - ''' executor.Executor(\n''' - ''' executor_input=executor_input,\n''' - ''' function_to_execute=function_to_execute).execute()\n''' - '\n' - '''if __name__ == '__main__':\n''' - ''' main()\n''') - f_contents = default_imports + custom_imports + custom_code + main_func - BuilderUtils.write_file(task_filepath, f_contents, 'w+') - -def create_component(component_spec: dict, - component_dir: str, - defaults_file: str): - """Updates the component_spec to include the correct image - and startup command, then writes the component.yaml. - Requires a defaults.yaml config to pull config vars from. - - Args: - component_spec: Component definition dictionary. - component_dir: Path of the component directory. - defaults_file: Path to the default config variables yaml. - Raises: - Exception: If an error is encountered writing the file. - """ - defaults = BuilderUtils.read_yaml_file(defaults_file) - component_spec['implementation']['container']['image'] = ( - f'''{defaults['gcp']['af_registry_location']}-docker.pkg.dev/''' - f'''{defaults['gcp']['project_id']}/''' - f'''{defaults['gcp']['af_registry_name']}/''' - f'''components/component_base:latest''') - component_spec['implementation']['container']['command'] = [ - 'python3', - f'''/pipelines/component/src/{component_spec['name']+'.py'}'''] - filename = component_dir + '/component.yaml' - BuilderUtils.write_file(filename, BuilderUtils.LICENSE, 'w') - BuilderUtils.write_yaml_file(filename, component_spec, 'a') - -def create_component_scaffold(name: str, - params: list, - description: str): - """Creates a tmp component scaffold which will be used by - the formalize function. Code is temporarily stored in - component_spec['implementation']['container']['command']. - - Args: - name: Component name. - params: Component parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - description: Optional description of the component. - """ - BuilderUtils.validate_name(name) - BuilderUtils.validate_params(params) - func_def = get_func_definition(name, params, description) - params = BuilderUtils.update_params(params) - code = BuilderUtils.read_file(BuilderUtils.CELL_TMPFILE) - code = filter_and_indent_cell(code) - BuilderUtils.delete_file(BuilderUtils.CELL_TMPFILE) - # make yaml - component_spec = {} - component_spec['name'] = name - if description: - component_spec['description'] = description - component_spec['inputs'] = params - component_spec['implementation'] = {} - component_spec['implementation']['container'] = {} - component_spec['implementation']['container']['image'] = 'TBD' - component_spec['implementation']['container']['command'] = func_def + code - component_spec['implementation']['container']['args'] = ['--executor_input', - {'executorInput': None}, '--function_to_execute', name] - filename = BuilderUtils.TMPFILES_DIR + f'/{name}.yaml' - BuilderUtils.write_yaml_file(filename, component_spec, 'w') - -def get_func_definition(name: str, - params: list, - description: str): - """Generates a python function definition to be used in - the {component_name}.py file (this file will contain - Jupyter cell code). - - Args: - name: Component name. - params: Component parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - description: Optional description of the component. - """ - newline = '\n' - return ( - f'\n' - f'def {name}(\n' - f'''{newline.join(f" {param['name']}: {param['type'].__name__}," for param in params)}\n''' - f'):\n' - f' """{description}\n' - f'\n' - f' Args:\n' - f'''{newline.join(f" {param['name']}: {param['description']}," for param in params)}\n''' - f' """' - ) - -def filter_and_indent_cell(code: str) -> str: - """Remove unwanted makeComponent function call - and indent cell code. - - Args: - code: String contains the contents of the - Jupyter cell. - Return: - str: Indented cell code with removed func call. - """ - code = code.replace(code[code.find('AutoMLOps.makeComponent('):code.find(')')+1], '') - indented_code = '' - for line in code.splitlines(): - indented_code += ' ' + line + '\n' - return indented_code diff --git a/build/lib/AutoMLOps/JupyterUtilsMagic.py b/build/lib/AutoMLOps/JupyterUtilsMagic.py deleted file mode 100644 index 8715b40..0000000 --- a/build/lib/AutoMLOps/JupyterUtilsMagic.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Jupyter Magics to be imported into notebook.""" - -# pylint: disable=C0103 - -import os - -from IPython import get_ipython -from IPython.core.magic import Magics, cell_magic, magics_class - -from AutoMLOps import BuilderUtils - -@magics_class -class JupyterUtilsMagic(Magics): - """Magics for writing imports and components cells.""" - @cell_magic - def define_imports(self, _, cell: str): - """Use for imports cells, saves and runs the cell.""" - make_tmpfiles_dir() - with open(BuilderUtils.IMPORTS_TMPFILE, 'w', encoding='utf-8') as file: - file.write(cell) - self.shell.run_cell(cell) - - @cell_magic - def define_component(self, _, cell: str): - """Use for component cells, saves and runs the cell.""" - make_tmpfiles_dir() - with open(BuilderUtils.CELL_TMPFILE, 'w', encoding='utf-8') as file: - file.write(cell) - # Execute just the makeComponent function call from the cell - code_to_exec = cell[ - cell.find('AutoMLOps.makeComponent('):cell.find(')')+1 - ] - self.shell.run_cell(code_to_exec) - - @cell_magic - def define_kfp_pipeline(self, _, cell: str): - """Use for component cells, saves and runs the cell.""" - make_tmpfiles_dir() - with open(BuilderUtils.PIPELINE_TMPFILE, 'w', encoding='utf-8') as file: - file.write(cell) - self.shell.run_cell(cell) - -def make_tmpfiles_dir(): - """Creates a tmpfiles directory to store intermediate files.""" - try: - os.makedirs(BuilderUtils.TMPFILES_DIR) - except FileExistsError: - pass - -try: - ipy = get_ipython() - ipy.register_magics(JupyterUtilsMagic) - make_tmpfiles_dir() - -except AttributeError as err: - raise AttributeError(f'Cannot load JupyterUtilsMagic, ' - f'this is not a notebook. {err}') from err diff --git a/build/lib/AutoMLOps/PipelineBuilder.py b/build/lib/AutoMLOps/PipelineBuilder.py deleted file mode 100644 index 328f8ec..0000000 --- a/build/lib/AutoMLOps/PipelineBuilder.py +++ /dev/null @@ -1,318 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Builds pipeline files.""" - -# pylint: disable=C0103 -# pylint: disable=line-too-long - -import json -from typing import Dict, List - -from AutoMLOps import BuilderUtils - -def formalize(custom_training_job_specs: List[Dict], - defaults_file: str, - pipeline_parameter_values: dict, - top_lvl_name: str): - """Constructs and writes pipeline.py, pipeline_runner.py, and pipeline_parameter_values.json files. - pipeline.py: Generates a Kubeflow pipeline spec from custom components. - pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec. - pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob. - - Args: - custom_training_job_specs: Specifies the specs to run the training job with. - defaults_file: Path to the default config variables yaml. - pipeline_parameter_values: Dictionary of runtime parameters for the PipelineJob. - top_lvl_name: Top directory name. - Raises: - Exception: If an error is encountered reading/writing to a file. - """ - defaults = BuilderUtils.read_yaml_file(defaults_file) - pipeline_file = top_lvl_name + 'pipelines/pipeline.py' - pipeline_runner_file = top_lvl_name + 'pipelines/pipeline_runner.py' - pipeline_params_file = top_lvl_name + BuilderUtils.PARAMETER_VALUES_PATH - # construct pipeline.py - pipeline_imports = get_pipeline_imports(custom_training_job_specs, defaults['gcp']['project_id']) - pipeline_argparse = get_pipeline_argparse() - try: - with open(pipeline_file, 'r+', encoding='utf-8') as file: - pipeline_scaffold = file.read() - file.seek(0, 0) - file.write(BuilderUtils.LICENSE) - file.write(pipeline_imports) - for line in pipeline_scaffold.splitlines(): - file.write(' ' + line + '\n') - file.write(pipeline_argparse) - file.close() - except OSError as err: - raise OSError(f'Error interacting with file. {err}') from err - # construct pipeline_runner.py - BuilderUtils.write_file(pipeline_runner_file, get_pipeline_runner(), 'w+') - # construct pipeline_parameter_values.json - serialized_params = json.dumps(pipeline_parameter_values, indent=4) - BuilderUtils.write_file(pipeline_params_file, serialized_params, 'w+') - -def get_pipeline_imports(custom_training_job_specs: List[Dict], project_id: str) -> str: - """Generates python code that imports modules and loads all custom components. - Args: - custom_training_job_specs: Specifies the specs to run the training job with. - project_id: The project_id to run the pipeline. - - Returns: - str: Python pipeline_imports code. - """ - components_list = BuilderUtils.get_components_list(full_path=False) - gcpc_imports = ( - 'from functools import partial\n' - 'from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_op_from_component\n') - quote = '\'' - newline_tab = '\n ' - return ( - f'''import argparse\n''' - f'''import os\n''' - f'''{gcpc_imports if custom_training_job_specs else ''}''' - f'''import kfp\n''' - f'''from kfp.v2 import compiler, dsl\n''' - f'''from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath\n''' - f'''import yaml\n''' - f'\n' - f'''def load_custom_component(component_name: str):\n''' - f''' component_path = os.path.join('components',\n''' - f''' component_name,\n''' - f''' 'component.yaml')\n''' - f''' return kfp.components.load_component_from_file(component_path)\n''' - f'\n' - f'''def create_training_pipeline(pipeline_job_spec_path: str):\n''' - f''' {newline_tab.join(f'{component} = load_custom_component(component_name={quote}{component}{quote})' for component in components_list)}\n''' - f'\n' - f'''{get_custom_job_specs(custom_training_job_specs, project_id)}''') - -def get_custom_job_specs(custom_training_job_specs: List[Dict], project_id: str) -> str: - """Generates python code that creates a custom training op from the specified component. - Args: - custom_training_job_specs: Specifies the specs to run the training job with. - project_id: The project_id to run the pipeline. - - Returns: - str: Python custom training op code. - """ - quote = '\'' - newline_tab = '\n ' - output_string = '' if not custom_training_job_specs else ( - f''' {newline_tab.join(f'{spec["component_spec"]}_custom_training_job_specs = {format_spec_dict(spec)}' for spec in custom_training_job_specs)}''' - f'\n' - f''' {newline_tab.join(f'{spec["component_spec"]}_job_op = create_custom_training_job_op_from_component(**{spec["component_spec"]}_custom_training_job_specs)' for spec in custom_training_job_specs)}''' - f'\n' - f''' {newline_tab.join(f'{spec["component_spec"]} = partial({spec["component_spec"]}_job_op, project={quote}{project_id}{quote})' for spec in custom_training_job_specs)}''' - f'\n') - return output_string - -def format_spec_dict(job_spec: dict) -> str: - """Takes in a job spec dictionary and removes the quotes around the component op name. - e.g. 'component_spec': 'train_model' becomes 'component_spec': train_model. - This is necessary to in order for the op to be callable within the Python code. - - Args: - job_spec: Dictionary with job spec info. - - Returns: - str: Python formatted dictionary code. - """ - quote = '\'' - left_bracket = '{' - right_bracket = '}' - newline = '\n' - - return ( - f'''{left_bracket}\n''' - f''' {f'{newline} '.join(f" {quote}{k}{quote}: {quote if k != 'component_spec' else ''}{v}{quote if k != 'component_spec' else ''}," for k, v in job_spec.items())}{newline}''' - f''' {right_bracket}\n''') - -def get_pipeline_argparse() -> str: - """Generates python code that loads default pipeline parameters from the defaults config_file. - - Returns: - str: Python pipeline_argparse code. - """ - return ( - '''\n''' - ''' compiler.Compiler().compile(\n''' - ''' pipeline_func=pipeline,\n''' - ''' package_path=pipeline_job_spec_path)\n''' - '\n' - '''if __name__ == '__main__':\n''' - ''' parser = argparse.ArgumentParser()\n''' - ''' parser.add_argument('--config', type=str,\n''' - ''' help='The config file for setting default values.')\n''' - '\n' - ''' args = parser.parse_args()\n''' - '\n' - ''' with open(args.config, 'r', encoding='utf-8') as config_file:\n''' - ''' config = yaml.load(config_file, Loader=yaml.FullLoader)\n''' - '\n' - ''' pipeline = create_training_pipeline(\n''' - ''' pipeline_job_spec_path=config['pipelines']['pipeline_job_spec_path'])\n''') - -def get_pipeline_runner() -> str: - """Generates python code that sends a PipelineJob to Vertex AI. - - Returns: - str: Python pipeline_runner code. - """ - return (BuilderUtils.LICENSE + - '''import argparse\n''' - '''import json\n''' - '''import logging\n''' - '''import os\n''' - '''import yaml\n''' - '\n' - '''from google.cloud import aiplatform\n''' - '\n' - '''logger = logging.getLogger()\n''' - '''log_level = os.environ.get('LOG_LEVEL', 'INFO')\n''' - '''logger.setLevel(log_level)\n''' - '\n' - '''def run_pipeline(\n''' - ''' project_id: str,\n''' - ''' pipeline_root: str,\n''' - ''' pipeline_runner_sa: str,\n''' - ''' parameter_values_path: str,\n''' - ''' pipeline_spec_path: str,\n''' - ''' display_name: str = 'mlops-pipeline-run',\n''' - ''' enable_caching: bool = False):\n''' - ''' """Executes a pipeline run.\n''' - '\n' - ''' Args:\n''' - ''' project_id: The project_id.\n''' - ''' pipeline_root: GCS location of the pipeline runs metadata.\n''' - ''' pipeline_runner_sa: Service Account to runner PipelineJobs.\n''' - ''' parameter_values_path: Location of parameter values JSON.\n''' - ''' pipeline_spec_path: Location of the pipeline spec JSON.\n''' - ''' display_name: Name to call the pipeline.\n''' - ''' enable_caching: Should caching be enabled (Boolean)\n''' - ''' """\n''' - ''' with open(parameter_values_path, 'r') as file:\n''' - ''' try:\n''' - ''' pipeline_params = json.load(file)\n''' - ''' except ValueError as exc:\n''' - ''' print(exc)\n''' - ''' logging.debug('Pipeline Parms Configured:')\n''' - ''' logging.debug(pipeline_params)\n''' - '\n' - ''' aiplatform.init(project=project_id)\n''' - ''' job = aiplatform.PipelineJob(\n''' - ''' display_name = display_name,\n''' - ''' template_path = pipeline_spec_path,\n''' - ''' pipeline_root = pipeline_root,\n''' - ''' parameter_values = pipeline_params,\n''' - ''' enable_caching = enable_caching)\n''' - ''' logging.debug('AI Platform job built. Submitting...')\n''' - ''' job.submit(service_account=pipeline_runner_sa)\n''' - ''' logging.debug('Job sent!')\n''' - '\n' - '''if __name__ == '__main__':\n''' - ''' parser = argparse.ArgumentParser()\n''' - ''' parser.add_argument('--config', type=str,\n''' - ''' help='The config file for setting default values.')\n''' - ''' args = parser.parse_args()\n''' - '\n' - ''' with open(args.config, 'r', encoding='utf-8') as config_file:\n''' - ''' config = yaml.load(config_file, Loader=yaml.FullLoader)\n''' - '\n' - ''' run_pipeline(project_id=config['gcp']['project_id'],\n''' - ''' pipeline_root=config['pipelines']['pipeline_storage_path'],\n''' - ''' pipeline_runner_sa=config['gcp']['pipeline_runner_service_account'],\n''' - ''' parameter_values_path=config['pipelines']['parameter_values_path'],\n''' - ''' pipeline_spec_path=config['pipelines']['pipeline_job_spec_path']) \n''') - -def create_pipeline_scaffold(name: str, - params: list, - pipeline: list, - description: str = None): - """Creates a temporary pipeline scaffold which will - be used by the formalize function. - - Args: - name: Pipeline name. - params: Pipeline parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - pipeline: Defines the components to use in the pipeline, - their order, and a mapping of component params to - pipeline params. A list of dictionaries, each dict - specifies a custom component and contains keys: - 'component_name': name of the component - 'param_mapping': a list of tuples mapping -> - (component_param, pipeline_param) - description: Optional description of the pipeline. - """ - BuilderUtils.validate_name(name) - BuilderUtils.validate_params(params) - BuilderUtils.validate_pipeline_structure(pipeline) - BuilderUtils.make_dirs([BuilderUtils.TMPFILES_DIR]) # if it doesn't already exist - pipeline_scaffold = get_pipeline_scaffold(name, params, pipeline, description) - BuilderUtils.write_file(BuilderUtils.PIPELINE_TMPFILE, pipeline_scaffold, 'w') - -def get_pipeline_scaffold(name: str, - params: list, - pipeline: list, - description: str): - """Generates the Kubeflow pipeline definition. Uses a - queue to define .after() ordering in the pipeline. - - Args: - name: Pipeline name. - params: Pipeline parameters. A list of dictionaries, - each param is a dict containing keys: - 'name': required, str param name. - 'type': required, python primitive type. - 'description': optional, str param desc. - pipeline: Defines the components to use in the pipeline, - their order, and a mapping of component params to - pipeline params. A list of dictionaries, each dict - specifies a custom component and contains keys: - 'component_name': name of the component - 'param_mapping': a list of tuples mapping -> - (component_param, pipeline_param) - description: Optional description of the pipeline. - """ - newline = '\n' - queue = [''] - for idx, component in enumerate(pipeline): - if idx != len(pipeline)-1: - queue.append(f'''.after({component['component_name']}_task)''') - return ( - f'\n' - f'@dsl.pipeline(\n' - f''' name='{name}',\n''' - f''' description='{description}')\n''' - f'def pipeline(\n' - f'''{newline.join(f" {param['name']}: {param['type'].__name__}," for param in params)}\n''' - f'):\n' - f''' """{description}\n''' - f'\n' - f' Args:\n' - f'''{newline.join(f" {param['name']}: {param['description']}," for param in params)}\n''' - f' """\n' - f"""{newline.join( - f''' {component['component_name']}_task = {component['component_name']}({newline}''' - f''' {f'{newline} '.join(f" {param[0]}={param[1]}," for param in component['param_mapping'])}{newline}''' - f''' ){queue.pop(0)}{newline}''' - for component in pipeline - )}""" - f'\n' - ) diff --git a/build/lib/AutoMLOps/__init__.py b/build/lib/AutoMLOps/__init__.py deleted file mode 100644 index ee1c357..0000000 --- a/build/lib/AutoMLOps/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2023 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -AutoMLOps - -AutoMLOps is a service that generates a production-style MLOps pipeline -from Jupyter Notebooks. The tool currently operates as a local package -import, with the end goal of becoming a Jupyter plugin to Vertex -Workbench managed notebooks. The tool will generate yaml-component -definitions, complete with Dockerfiles and requirements.txts for all -Kubeflow components defined in a notebook. It will also generate a -series of directories to support the creation of Vertex Pipelines. -""" -# pylint: disable=invalid-name -__version__ = '1.0.2' -__author__ = 'Sean Rastatter' -__credits__ = 'Google'