Skip to content

Commit

Permalink
Merge branch 'main' into feature/model_monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
srastatter authored Jul 28, 2023
2 parents a46a31d + be7883e commit 4b6ac82
Show file tree
Hide file tree
Showing 21 changed files with 93 additions and 5 deletions.
8 changes: 8 additions & 0 deletions AutoMLOps/AutoMLOps.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

make_dirs([OUTPUT_DIR])


def go(project_id: str,
pipeline_params: Dict,
af_registry_location: Optional[str] = 'us-central1',
Expand Down Expand Up @@ -106,6 +107,7 @@ def go(project_id: str,
schedule_name, schedule_pattern, vpc_connector)
run(run_local)


def generate(project_id: str,
pipeline_params: Dict,
af_registry_location: Optional[str] = 'us-central1',
Expand Down Expand Up @@ -157,6 +159,7 @@ def generate(project_id: str,
cloud_run_name, default_pipeline_runner_sa, project_id,
run_local, schedule_pattern, vpc_connector)


def run(run_local: bool):
"""Builds, compiles, and submits the PipelineJob.
Expand All @@ -180,6 +183,7 @@ def run(run_local: bool):
# Log generated resources
_resources_generation_manifest(run_local)


def _resources_generation_manifest(run_local: bool):
"""Logs urls of generated resources.
Expand Down Expand Up @@ -210,6 +214,7 @@ def _resources_generation_manifest(run_local: bool):
if defaults['gcp']['cloud_schedule_pattern'] != 'No Schedule Specified':
logging.info('Cloud Scheduler Job: https://console.cloud.google.com/cloudscheduler')


def _push_to_csr():
"""Initializes a git repo if one doesn't already exist,
then pushes to the specified branch and triggers the cloudbuild job.
Expand Down Expand Up @@ -248,6 +253,7 @@ def _push_to_csr():
logging.info(f'''Pushing code to {defaults['gcp']['cloud_source_repository_branch']} branch, triggering cloudbuild...''')
logging.info(f'''Cloudbuild job running at: https://console.cloud.google.com/cloud-build/builds;region={defaults['gcp']['cb_trigger_location']}''')


def component(func: Optional[Callable] = None,
*,
packages_to_install: Optional[List[str]] = None):
Expand Down Expand Up @@ -275,6 +281,7 @@ def my_function_one(input: str, output: Output[Model]):
func=func,
packages_to_install=packages_to_install)


def pipeline(func: Optional[Callable] = None,
*,
name: Optional[str] = None,
Expand Down Expand Up @@ -313,6 +320,7 @@ def pipeline(bq_table: str,
name=name,
description=description)


def clear_cache():
"""Deletes all temporary files stored in the cache directory."""
execute_process(f'rm -rf {OUTPUT_DIR}', to_null=False)
Expand Down
1 change: 1 addition & 0 deletions AutoMLOps/deployments/cloudbuild/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from AutoMLOps.deployments.cloudbuild.constructs.scripts import CloudBuildScripts


def build(af_registry_location: str,
af_registry_name: str,
cloud_run_location: str,
Expand Down
1 change: 1 addition & 0 deletions AutoMLOps/deployments/cloudbuild/constructs/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from AutoMLOps.utils.constants import GENERATED_LICENSE


class CloudBuildScripts():
"""Generates CloudBuild yaml config file."""
def __init__(self,
Expand Down
3 changes: 3 additions & 0 deletions AutoMLOps/frameworks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
# pylint: disable=line-too-long

from typing import Dict, List

from AutoMLOps.utils.utils import read_yaml_file


class Component():
"""Parent class that defines a general abstraction of a Component."""
def __init__(self, component_spec: dict, defaults_file: str):
Expand All @@ -38,6 +40,7 @@ def __init__(self, component_spec: dict, defaults_file: str):
self._project_id = defaults['gcp']['project_id']
self._af_registry_name = defaults['gcp']['af_registry_name']


class Pipeline():
"""Parent class that defines a general abstraction of a Pipeline """
def __init__(self, custom_training_job_specs: List[Dict], defaults_file: str):
Expand Down
4 changes: 4 additions & 0 deletions AutoMLOps/frameworks/kfp/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from AutoMLOps.frameworks.kfp.constructs.pipeline import KfpPipeline
from AutoMLOps.frameworks.kfp.constructs.scripts import KfpScripts


def build(project_id: str,
pipeline_params: Dict,
af_registry_location: Optional[str],
Expand Down Expand Up @@ -141,6 +142,7 @@ def build(project_id: str,
if not run_local:
build_cloudrun()


def build_component(component_path: str):
"""Constructs and writes component.yaml and {component_name}.py files.
component.yaml: Contains the Kubeflow custom component definition.
Expand Down Expand Up @@ -183,6 +185,7 @@ def build_component(component_path: str):
write_file(filename, GENERATED_LICENSE, 'w')
write_yaml_file(filename, component_spec, 'a')


def build_pipeline(custom_training_job_specs: List[Dict],
pipeline_parameter_values: dict):
"""Constructs and writes pipeline.py, pipeline_runner.py, and pipeline_parameter_values.json files.
Expand Down Expand Up @@ -223,6 +226,7 @@ def build_pipeline(custom_training_job_specs: List[Dict],
serialized_params = json.dumps(pipeline_parameter_values, indent=4)
write_file(pipeline_params_file, serialized_params, 'w+')


def build_cloudrun():
"""Constructs and writes a Dockerfile, requirements.txt, and
main.py to the cloud_run/run_pipeline directory. Also
Expand Down
1 change: 1 addition & 0 deletions AutoMLOps/frameworks/kfp/constructs/cloudrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
RIGHT_BRACKET
)


class KfpCloudRun():
"""Generates files related to cloud runner service."""
def __init__(self, defaults_file: str):
Expand Down
1 change: 1 addition & 0 deletions AutoMLOps/frameworks/kfp/constructs/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from AutoMLOps.utils.utils import is_using_kfp_spec
from AutoMLOps.frameworks.base import Component


class KfpComponent(Component):
"""Child class that generates files related to kfp components."""
def __init__(self, component_spec: dict, defaults_file: str):
Expand Down
3 changes: 1 addition & 2 deletions AutoMLOps/frameworks/kfp/constructs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from AutoMLOps.utils.constants import GENERATED_LICENSE
from AutoMLOps.frameworks.base import Pipeline


class KfpPipeline(Pipeline):
"""Child class that generates files related to kfp pipelines."""
def __init__(self, custom_training_job_specs: List[Dict], defaults_file: str):
Expand Down Expand Up @@ -57,7 +58,6 @@ def custom_specs_helper(self, custom_training_job_specs):
f'\n')
return custom_specs


def _get_pipeline_imports(self):
"""Generates python code that imports modules and loads all custom components.
Expand All @@ -71,7 +71,6 @@ def _get_pipeline_imports(self):
quote = '\''
newline_tab = '\n '


# If there is a custom training job specified, write those to feed to pipeline imports
custom_specs = self.custom_specs_helper(self._custom_training_job_specs)

Expand Down
1 change: 1 addition & 0 deletions AutoMLOps/frameworks/kfp/constructs/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
RIGHT_BRACKET
)


class KfpScripts():
"""Generates files related to running kubeflow pipelines."""
def __init__(self,
Expand Down
7 changes: 7 additions & 0 deletions AutoMLOps/frameworks/kfp/scaffold.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

T = TypeVar('T')


def create_component_scaffold(func: Optional[Callable] = None,
*,
packages_to_install: Optional[List[str]] = None):
Expand Down Expand Up @@ -77,6 +78,7 @@ def create_component_scaffold(func: Optional[Callable] = None,
make_dirs([CACHE_DIR])
write_yaml_file(filename, component_spec, 'w')


def get_packages_to_install_command(func: Optional[Callable] = None,
packages_to_install: Optional[List[str]] = None):
"""Returns a list of formatted list of commands, including code for tmp storage.
Expand All @@ -103,6 +105,7 @@ def get_packages_to_install_command(func: Optional[Callable] = None,
src_code = get_function_source_definition(func)
return ['sh', '-c', install_python_packages_script, src_code]


def get_function_parameters(func: Callable) -> dict:
"""Returns a formatted list of parameters.
Expand Down Expand Up @@ -137,6 +140,7 @@ def get_function_parameters(func: Callable) -> dict:
f'''Please specify the type for this parameter.''')
return update_params(parameter_holder)


def maybe_strip_optional_from_annotation(annotation: T) -> T:
"""Strips 'Optional' from 'Optional[<type>]' if applicable.
For example::
Expand All @@ -153,6 +157,7 @@ def maybe_strip_optional_from_annotation(annotation: T) -> T:
else:
return annotation


def create_pipeline_scaffold(func: Optional[Callable] = None,
*,
name: Optional[str] = None,
Expand All @@ -174,6 +179,7 @@ def create_pipeline_scaffold(func: Optional[Callable] = None,
make_dirs([CACHE_DIR]) # if it doesn't already exist
write_file(PIPELINE_CACHE_FILE, pipeline_scaffold, 'w')


def get_pipeline_decorator(name: Optional[str] = None,
description: Optional[str] = None):
"""Creates the kfp pipeline decorator.
Expand All @@ -191,6 +197,7 @@ def get_pipeline_decorator(name: Optional[str] = None,
ending_str = ')\n'
return '@dsl.pipeline' + name_str + desc_str + ending_str


def get_compile_step(func_name: str):
"""Creates the compile function call.
Expand Down
15 changes: 15 additions & 0 deletions AutoMLOps/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
PLACEHOLDER_IMAGE
)


def make_dirs(directories: list):
"""Makes directories with the specified names.
Expand All @@ -44,6 +45,7 @@ def make_dirs(directories: list):
except FileExistsError:
pass


def read_yaml_file(filepath: str) -> dict:
"""Reads a yaml and returns file contents as a dict.
Defaults to utf-8 encoding.
Expand All @@ -63,6 +65,7 @@ def read_yaml_file(filepath: str) -> dict:
raise yaml.YAMLError(f'Error reading file. {err}') from err
return file_dict


def write_yaml_file(filepath: str, contents: dict, mode: str):
"""Writes a dictionary to yaml. Defaults to utf-8 encoding.
Expand All @@ -80,6 +83,7 @@ def write_yaml_file(filepath: str, contents: dict, mode: str):
except yaml.YAMLError as err:
raise yaml.YAMLError(f'Error writing to file. {err}') from err


def read_file(filepath: str) -> str:
"""Reads a file and returns contents as a string.
Defaults to utf-8 encoding.
Expand All @@ -99,6 +103,7 @@ def read_file(filepath: str) -> str:
raise FileNotFoundError(f'Error reading file. {err}') from err
return contents


def write_file(filepath: str, text: str, mode: str):
"""Writes a file at the specified path. Defaults to utf-8 encoding.
Expand All @@ -116,6 +121,7 @@ def write_file(filepath: str, text: str, mode: str):
except OSError as err:
raise OSError(f'Error writing to file. {err}') from err


def write_and_chmod(filepath: str, text: str):
"""Writes a file at the specified path and chmods the file
to allow for execution.
Expand All @@ -133,6 +139,7 @@ def write_and_chmod(filepath: str, text: str):
except OSError as err:
raise OSError(f'Error chmod-ing file. {err}') from err


def delete_file(filepath: str):
"""Deletes a file at the specified path.
If it does not exist, pass.
Expand All @@ -145,6 +152,7 @@ def delete_file(filepath: str):
except OSError:
pass


def get_components_list(full_path: bool = True) -> list:
"""Reads yamls in the cache directory, verifies they are component
yamls, and returns the name of the files.
Expand All @@ -165,6 +173,7 @@ def get_components_list(full_path: bool = True) -> list:
components_list.append(os.path.basename(file).split('.')[0])
return components_list


def is_component_config(filepath: str) -> bool:
"""Checks to see if the given file is a component yaml.
Expand All @@ -177,6 +186,7 @@ def is_component_config(filepath: str) -> bool:
file_dict = read_yaml_file(filepath)
return all(key in file_dict.keys() for key in required_keys)


def execute_process(command: str, to_null: bool):
"""Executes an external shell process.
Expand All @@ -196,6 +206,7 @@ def execute_process(command: str, to_null: bool):
except subprocess.CalledProcessError as err:
raise RuntimeError(f'Error executing process. {err}') from err


def validate_schedule(schedule_pattern: str, run_local: str):
"""Validates that the inputted schedule parameter aligns with the run_local configuration.
Note: this function does not validate that schedule_pattern is a properly formatted cron value.
Expand All @@ -210,6 +221,7 @@ def validate_schedule(schedule_pattern: str, run_local: str):
if schedule_pattern != 'No Schedule Specified' and run_local:
raise ValueError('run_local must be set to False to use Cloud Scheduler.')


def update_params(params: list) -> list:
"""Converts the parameter types from Python types
to Kubeflow types. Currently only supports
Expand Down Expand Up @@ -242,6 +254,7 @@ def update_params(params: list) -> list:
f'primitive types at this time. {err}') from err
return params


def get_function_source_definition(func: Callable) -> str:
"""Returns a formatted string of the source code.
Expand All @@ -267,6 +280,7 @@ def get_function_source_definition(func: Callable) -> str:

return '\n'.join(source_code_lines)


def format_spec_dict(job_spec: dict) -> str:
"""Takes in a job spec dictionary and removes the quotes around the component op name.
e.g. 'component_spec': 'train_model' becomes 'component_spec': train_model.
Expand All @@ -288,6 +302,7 @@ def format_spec_dict(job_spec: dict) -> str:
f''' {f'{newline} '.join(f" {quote}{k}{quote}: {quote if k != 'component_spec' else ''}{v}{quote if k != 'component_spec' else ''}," for k, v in job_spec.items())}{newline}'''
f''' {right_bracket}\n''')


def is_using_kfp_spec(image: str):
"""Takes in an image string from a component yaml and determines if it came from kfp or not.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ docopt==0.6.2
docstring-parser==0.15
pandas==1.3.5
pipreqs==0.4.11
PyYAML==5.4.1
PyYAML==6.0.1
yarg==0.1.9
mock
pylint
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
install_requires=['docopt==0.6.2',
'docstring-parser==0.15',
'pipreqs==0.4.11',
'PyYAML==5.4.1',
'PyYAML==6.0.1',
'yarg==0.1.9'],
classifiers=[
'Development Status :: 4 - Beta',
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/frameworks/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
}
}


@pytest.fixture(name='defaults_dict', params=[DEFAULTS1, DEFAULTS2])
def fixture_defaults_dict(request: pytest.FixtureRequest, tmpdir: pytest.FixtureRequest):
"""Writes temporary yaml file fixture using defaults parameterized
Expand All @@ -61,6 +62,7 @@ def fixture_defaults_dict(request: pytest.FixtureRequest, tmpdir: pytest.Fixture
write_yaml_file(yaml_path, request.param, 'w')
return {'path': yaml_path, 'vals': request.param}


@pytest.mark.parametrize(
'component_spec',
[{'test1': 'val1'}, {'test2': 'val2'}]
Expand All @@ -83,6 +85,7 @@ def test_Component(defaults_dict: pytest.FixtureRequest, component_spec: dict):
assert my_component._project_id == defaults['gcp']['project_id']
assert my_component._component_spec == component_spec


@pytest.mark.parametrize(
'custom_training_job_specs',
[
Expand Down
Loading

0 comments on commit 4b6ac82

Please sign in to comment.