Skip to content

Commit

Permalink
Merge pull request #21 from GoogleCloudPlatform/feature/cleanup-cb
Browse files Browse the repository at this point in the history
Feature/cleanup cb
  • Loading branch information
srastatter authored May 31, 2023
2 parents 47c069d + 6d4ab29 commit 3ccaaf7
Show file tree
Hide file tree
Showing 34 changed files with 1,032 additions and 651 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.tmpfiles/
.AutoMLOps-cache/
.DS_Store

# Byte-compiled / optimized / DLL files
Expand Down
24 changes: 15 additions & 9 deletions AutoMLOps/AutoMLOps.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s')
logger = logging.getLogger()

make_dirs([OUTPUT_DIR])

def go(project_id: str,
pipeline_params: Dict,
af_registry_location: Optional[str] = 'us-central1',
af_registry_name: Optional[str] = 'vertex-mlops-af',
base_image: Optional[str] = 'python:3.9-slim',
cb_trigger_location: Optional[str] = 'us-central1',
cb_trigger_name: Optional[str] = 'automlops-trigger',
cloud_run_location: Optional[str] = 'us-central1',
Expand All @@ -66,7 +69,6 @@ def go(project_id: str,
schedule_location: Optional[str] = 'us-central1',
schedule_name: Optional[str] = 'AutoMLOps-schedule',
schedule_pattern: Optional[str] = 'No Schedule Specified',
use_kfp_spec: Optional[bool] = False,
vpc_connector: Optional[str] = 'No VPC Specified'):
"""Generates relevant pipeline and component artifacts,
then builds, compiles, and submits the PipelineJob.
Expand All @@ -76,6 +78,7 @@ def go(project_id: str,
pipeline_params: Dictionary containing runtime pipeline parameters.
af_registry_location: Region of the Artifact Registry.
af_registry_name: Artifact Registry name where components are stored.
base_image: The image to use in the component base dockerfile.
cb_trigger_location: The location of the cloudbuild trigger.
cb_trigger_name: The name of the cloudbuild trigger.
cloud_run_location: The location of the cloud runner service.
Expand All @@ -92,23 +95,22 @@ def go(project_id: str,
schedule_location: The location of the scheduler resource.
schedule_name: The name of the scheduler resource.
schedule_pattern: Cron formatted value used to create a Scheduled retrain job.
use_kfp_spec: Flag that determines the format of the component yamls.
vpc_connector: The name of the vpc connector to use.
"""
generate(project_id, pipeline_params, af_registry_location,
af_registry_name, cb_trigger_location, cb_trigger_name,
af_registry_name, base_image, cb_trigger_location, cb_trigger_name,
cloud_run_location, cloud_run_name, cloud_tasks_queue_location,
cloud_tasks_queue_name, csr_branch_name, csr_name,
custom_training_job_specs, gs_bucket_location, gs_bucket_name,
pipeline_runner_sa, run_local, schedule_location,
schedule_name, schedule_pattern, use_kfp_spec,
vpc_connector)
schedule_name, schedule_pattern, vpc_connector)
run(run_local)

def generate(project_id: str,
pipeline_params: Dict,
af_registry_location: Optional[str] = 'us-central1',
af_registry_name: Optional[str] = 'vertex-mlops-af',
base_image: Optional[str] = 'python:3.9-slim',
cb_trigger_location: Optional[str] = 'us-central1',
cb_trigger_name: Optional[str] = 'automlops-trigger',
cloud_run_location: Optional[str] = 'us-central1',
Expand All @@ -125,7 +127,6 @@ def generate(project_id: str,
schedule_location: Optional[str] = 'us-central1',
schedule_name: Optional[str] = 'AutoMLOps-schedule',
schedule_pattern: Optional[str] = 'No Schedule Specified',
use_kfp_spec: Optional[bool] = False,
vpc_connector: Optional[str] = 'No VPC Specified'):
"""Generates relevant pipeline and component artifacts.
Expand All @@ -145,13 +146,12 @@ def generate(project_id: str,

# Build files required to run a Kubeflow Pipeline
KfpBuilder.build(project_id, pipeline_params, af_registry_location,
af_registry_name, cb_trigger_location, cb_trigger_name,
af_registry_name, base_image, cb_trigger_location, cb_trigger_name,
cloud_run_location, cloud_run_name, cloud_tasks_queue_location,
cloud_tasks_queue_name, csr_branch_name, csr_name,
custom_training_job_specs, gs_bucket_location, default_bucket_name,
default_pipeline_runner_sa, run_local, schedule_location,
schedule_name, schedule_pattern, use_kfp_spec,
vpc_connector)
schedule_name, schedule_pattern, vpc_connector)

CloudBuildBuilder.build(af_registry_location, af_registry_name, cloud_run_location,
cloud_run_name, default_pipeline_runner_sa, project_id,
Expand Down Expand Up @@ -307,3 +307,9 @@ def pipeline(bq_table: str,
func=func,
name=name,
description=description)

def clear_cache():
"""Deletes all temporary files stored in the cache directory."""
execute_process(f'rm -rf {OUTPUT_DIR}', to_null=False)
make_dirs([OUTPUT_DIR])
logging.info('Cache cleared.')
2 changes: 1 addition & 1 deletion AutoMLOps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
series of directories to support the creation of Vertex Pipelines.
"""
# pylint: disable=invalid-name
__version__ = '1.1.1'
__version__ = '1.1.2'
__author__ = 'Sean Rastatter'
__credits__ = 'Google'
78 changes: 29 additions & 49 deletions AutoMLOps/frameworks/kfp/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,22 @@
get_components_list,
make_dirs,
read_yaml_file,
is_using_kfp_spec,
write_and_chmod,
write_file,
write_yaml_file
)
from AutoMLOps.utils.constants import (
BASE_DIR,
DEFAULT_IMAGE,
GENERATED_BUILD_COMPONENTS_SH_FILE,
GENERATED_CLOUDBUILD_FILE,
GENERATED_DEFAULTS_FILE,
GENERATED_COMPONENT_BASE,
GENERATED_PIPELINE_FILE,
GENERATED_PIPELINE_SPEC_SH_FILE,
GENERATED_RESOURCES_SH_FILE,
GENERATED_RUN_PIPELINE_SH_FILE,
GENERATED_RUN_ALL_SH_FILE,
PIPELINE_TMPFILE,
PIPELINE_CACHE_FILE,
GENERATED_LICENSE,
GENERATED_PARAMETER_VALUES_PATH
)
Expand All @@ -53,6 +52,7 @@ def build(project_id: str,
pipeline_params: Dict,
af_registry_location: Optional[str],
af_registry_name: Optional[str],
base_image: Optional[str],
cb_trigger_location: Optional[str],
cb_trigger_name: Optional[str],
cloud_run_location: Optional[str],
Expand All @@ -69,13 +69,13 @@ def build(project_id: str,
schedule_location: Optional[str],
schedule_name: Optional[str],
schedule_pattern: Optional[str],
use_kfp_spec: Optional[bool],
vpc_connector: Optional[str]):
"""Constructs scripts for resource deployment and running Kubeflow pipelines.
Args:
af_registry_location: Region of the Artifact Registry.
af_registry_name: Artifact Registry name where components are stored.
base_image: The image to use in the component base dockerfile.
cb_trigger_location: The location of the cloudbuild trigger.
cb_trigger_name: The name of the cloudbuild trigger.
cloud_run_location: The location of the cloud runner service.
Expand All @@ -84,7 +84,6 @@ def build(project_id: str,
cloud_tasks_queue_name: The name of the cloud tasks queue.
csr_branch_name: The name of the csr branch to push to to trigger cb job.
csr_name: The name of the cloud source repo to use.
default_image: The image to use in the dockerfile.
gs_bucket_location: Region of the GS bucket.
gs_bucket_name: GS bucket name where pipeline run metadata is stored.
pipeline_runner_sa: Service Account to runner PipelineJobs.
Expand All @@ -93,16 +92,15 @@ def build(project_id: str,
schedule_location: The location of the scheduler resource.
schedule_name: The name of the scheduler resource.
schedule_pattern: Cron formatted value used to create a Scheduled retrain job.
base_dir: Top directory name.
vpc_connector: The name of the vpc connector to use.
"""

# Get scripts builder object
kfp_scripts = KfpScripts(
af_registry_location, af_registry_name, cb_trigger_location,
af_registry_location, af_registry_name, base_image, cb_trigger_location,
cb_trigger_name, cloud_run_location, cloud_run_name,
cloud_tasks_queue_location, cloud_tasks_queue_name, csr_branch_name,
csr_name, DEFAULT_IMAGE, gs_bucket_location, gs_bucket_name,
csr_name, gs_bucket_location, gs_bucket_name,
pipeline_runner_sa, project_id, run_local, schedule_location,
schedule_name, schedule_pattern, BASE_DIR, vpc_connector)

Expand All @@ -115,18 +113,17 @@ def build(project_id: str,
write_and_chmod(GENERATED_RUN_PIPELINE_SH_FILE, kfp_scripts.run_pipeline)
write_and_chmod(GENERATED_RUN_ALL_SH_FILE, kfp_scripts.run_all)

# Write scripts to create resources and cloud build config
# Write scripts to create resources
write_and_chmod(GENERATED_RESOURCES_SH_FILE, kfp_scripts.create_resources_script)
write_file(GENERATED_CLOUDBUILD_FILE, kfp_scripts.create_cloudbuild_config, 'w+')

# Copy tmp pipeline file over to AutoMLOps directory
execute_process(f'cp {PIPELINE_TMPFILE} {GENERATED_PIPELINE_FILE}', to_null=False)
execute_process(f'cp {PIPELINE_CACHE_FILE} {GENERATED_PIPELINE_FILE}', to_null=False)

# Create components and pipelines
components_path_list = get_components_list()
for path in components_path_list:
build_component(path, BASE_DIR, GENERATED_DEFAULTS_FILE, use_kfp_spec)
build_pipeline(custom_training_job_specs, GENERATED_DEFAULTS_FILE, pipeline_params, BASE_DIR)
build_component(path)
build_pipeline(custom_training_job_specs, pipeline_params)

# Write dockerfile to the component base directory
write_file(f'{GENERATED_COMPONENT_BASE}/Dockerfile', kfp_scripts.dockerfile, 'w')
Expand All @@ -136,13 +133,9 @@ def build(project_id: str,

# Build the cloud run files
if not run_local:
build_cloudrun(BASE_DIR, GENERATED_DEFAULTS_FILE)
build_cloudrun()


def build_component(component_path: str,
base_dir: str,
defaults_file: str,
use_kfp_spec: bool):
def build_component(component_path: str):
"""Constructs and writes component.yaml and {component_name}.py files.
component.yaml: Contains the Kubeflow custom component definition.
{component_name}.py: Contains the python code from the Jupyter cell.
Expand All @@ -151,27 +144,24 @@ def build_component(component_path: str,
component_path: Path to the temporary component yaml. This file
is used to create the permanent component.yaml, and deleted
after calling AutoMLOps.generate().
base_dir: Top directory name.
defaults_file: Path to the default config variables yaml.
use_kfp_spec: Flag that determines the format of the component yamls.
"""
# Read in component specs
component_spec = read_yaml_file(component_path)

# If using kfp, remove spaces in name and convert to lowercase
if use_kfp_spec:
if is_using_kfp_spec(component_spec['implementation']['container']['image']):
component_spec['name'] = component_spec['name'].replace(' ', '_').lower()

# Set and create directory for component, and set directory for task
component_dir = base_dir + 'components/' + component_spec['name']
task_filepath = (base_dir
component_dir = BASE_DIR + 'components/' + component_spec['name']
task_filepath = (BASE_DIR
+ 'components/component_base/src/'
+ component_spec['name']
+ '.py')
make_dirs([component_dir])

# Initialize component scripts builder
kfp_comp = KfpComponent(component_spec, defaults_file)
kfp_comp = KfpComponent(component_spec, GENERATED_DEFAULTS_FILE)

# Write task script to component base
write_file(task_filepath, kfp_comp.task, 'w+')
Expand All @@ -187,31 +177,26 @@ def build_component(component_path: str,
write_file(filename, GENERATED_LICENSE, 'w')
write_yaml_file(filename, component_spec, 'a')


def build_pipeline(custom_training_job_specs: List[Dict],
defaults_file: str,
pipeline_parameter_values: dict,
base_dir: str):
pipeline_parameter_values: dict):
"""Constructs and writes pipeline.py, pipeline_runner.py, and pipeline_parameter_values.json files.
pipeline.py: Generates a Kubeflow pipeline spec from custom components.
pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec.
pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob.
Args:
custom_training_job_specs: Specifies the specs to run the training job with.
defaults_file: Path to the default config variables yaml.
pipeline_parameter_values: Dictionary of runtime parameters for the PipelineJob.
base_dir: Top directory name.
Raises:
Exception: If an error is encountered reading/writing to a file.
"""
# Set paths
pipeline_file = base_dir + 'pipelines/pipeline.py'
pipeline_runner_file = base_dir + 'pipelines/pipeline_runner.py'
pipeline_params_file = base_dir + GENERATED_PARAMETER_VALUES_PATH
pipeline_file = BASE_DIR + 'pipelines/pipeline.py'
pipeline_runner_file = BASE_DIR + 'pipelines/pipeline_runner.py'
pipeline_params_file = BASE_DIR + GENERATED_PARAMETER_VALUES_PATH

# Initializes pipeline scripts builder
kfp_pipeline = KfpPipeline(custom_training_job_specs, defaults_file)
kfp_pipeline = KfpPipeline(custom_training_job_specs, GENERATED_DEFAULTS_FILE)
try:
with open(pipeline_file, 'r+', encoding='utf-8') as file:
pipeline_scaffold = file.read()
Expand All @@ -232,29 +217,24 @@ def build_pipeline(custom_training_job_specs: List[Dict],
serialized_params = json.dumps(pipeline_parameter_values, indent=4)
write_file(pipeline_params_file, serialized_params, 'w+')

def build_cloudrun(base_dir: str,
defaults_file: str,):
def build_cloudrun():
"""Constructs and writes a Dockerfile, requirements.txt, and
main.py to the cloud_run/run_pipeline directory. Also
constructs and writes a main.py, requirements.txt, and
pipeline_parameter_values.json to the
cloud_run/queueing_svc directory.
Args:
base_dir: Top directory name.
defaults_file: Path to the default config variables yaml.
"""
# Make new directories
make_dirs([base_dir + 'cloud_run',
base_dir + 'cloud_run/run_pipeline',
base_dir + 'cloud_run/queueing_svc'])
make_dirs([BASE_DIR + 'cloud_run',
BASE_DIR + 'cloud_run/run_pipeline',
BASE_DIR + 'cloud_run/queueing_svc'])

# Initialize cloud run scripts object
cloudrun_scripts = KfpCloudRun(defaults_file)
cloudrun_scripts = KfpCloudRun(GENERATED_DEFAULTS_FILE)

# Set new folders as variables
cloudrun_base = base_dir + 'cloud_run/run_pipeline'
queueing_svc_base = base_dir + 'cloud_run/queueing_svc'
cloudrun_base = BASE_DIR + 'cloud_run/run_pipeline'
queueing_svc_base = BASE_DIR + 'cloud_run/queueing_svc'

# Write cloud run dockerfile
write_file(f'{cloudrun_base}/Dockerfile', cloudrun_scripts.dockerfile, 'w')
Expand All @@ -268,4 +248,4 @@ def build_cloudrun(base_dir: str,
write_file(f'{queueing_svc_base}/main.py', cloudrun_scripts.queueing_svc, 'w')

# Copy runtime parameters over to queueing_svc dir
execute_process(f'''cp -r {base_dir + GENERATED_PARAMETER_VALUES_PATH} {base_dir + 'cloud_run/queueing_svc'}''', to_null=False)
execute_process(f'''cp -r {BASE_DIR + GENERATED_PARAMETER_VALUES_PATH} {BASE_DIR + 'cloud_run/queueing_svc'}''', to_null=False)
12 changes: 8 additions & 4 deletions AutoMLOps/frameworks/kfp/constructs/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# pylint: disable=line-too-long

from AutoMLOps.utils.constants import GENERATED_LICENSE
from AutoMLOps.utils.utils import is_using_kfp_spec
from AutoMLOps.frameworks.base import Component

class KfpComponent(Component):
Expand All @@ -42,16 +43,19 @@ def _create_task(self):
str: Contents of component base source code.
"""
custom_code = self._component_spec['implementation']['container']['command'][-1]
default_imports = (
GENERATED_LICENSE +
default_imports = (GENERATED_LICENSE +
'import argparse\n'
'import json\n'
'from kfp.v2.components import executor\n')
if not is_using_kfp_spec(self._component_spec['implementation']['container']['image']):
custom_imports = ('\n'
'import kfp\n'
'from kfp.v2 import dsl\n'
'from kfp.v2.components import executor\n'
'from kfp.v2.dsl import *\n'
'from typing import *\n'
'\n')
else:
custom_imports = '' # the above is already included as part of the kfp spec
main_func = (
'\n'
'''def main():\n'''
Expand All @@ -70,7 +74,7 @@ def _create_task(self):
'\n'
'''if __name__ == '__main__':\n'''
''' main()\n''')
return default_imports + custom_code + main_func
return default_imports + custom_imports + custom_code + main_func

def _create_compspec_image(self):
"""Write the correct image for the component spec.
Expand Down
Loading

0 comments on commit 3ccaaf7

Please sign in to comment.