Skip to content

Commit

Permalink
Merge pull request #13 from GoogleCloudPlatform/feature/makeCompV2
Browse files Browse the repository at this point in the history
Feature/make comp v1.1.0
  • Loading branch information
srastatter authored May 1, 2023
2 parents 5c8ca94 + 52dc358 commit eb63e8e
Show file tree
Hide file tree
Showing 69 changed files with 3,404 additions and 1,168 deletions.
347 changes: 181 additions & 166 deletions AutoMLOps/AutoMLOps.py

Large diffs are not rendered by default.

118 changes: 33 additions & 85 deletions AutoMLOps/BuilderUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
# pylint: disable=C0103
# pylint: disable=line-too-long

import inspect
import os
import subprocess

import itertools
import textwrap
from typing import Callable
import yaml

TMPFILES_DIR = '.tmpfiles'
Expand Down Expand Up @@ -97,9 +102,9 @@ def read_file(filepath: str) -> str:
Defaults to utf-8 encoding.
Args:
filepath: Path to the yaml.
filepath: Path to the file.
Returns:
dict: Contents of the yaml.
str: Contents of the file.
Raises:
Exception: If an error is encountered reading the file.
"""
Expand Down Expand Up @@ -218,88 +223,6 @@ def validate_schedule(schedule_pattern: str, run_local: str):
if schedule_pattern != 'No Schedule Specified' and run_local:
raise ValueError('run_local must be set to False to use Cloud Scheduler.')

def validate_name(name: str):
"""Validates that the inputted name parameter is of type str.
Args:
name: The name of a component or pipeline.
Raises:
Exception: If the name is not of type str.
"""
if not isinstance(name, str):
raise TypeError('Pipeline and Component names must be of type string.')

def validate_params(params: list):
"""Verifies that the inputted params follow the correct
specification.
Args:
params: Pipeline parameters. A list of dictionaries,
each param is a dict containing keys:
'name': required, str param name.
'type': required, python primitive type.
'description': optional, str param desc.
Raises:
Exception: If incorrect params specification.
"""
s = set()
for param in params:
try:
name = param['name']
if not isinstance(name, str):
raise TypeError('Parameter name must be of type string.')
param_type = param['type']
if not isinstance(param_type, type):
raise TypeError('Parameter type must be a valid python type.')
except KeyError as err:
raise ValueError(f'Parameter {param} does not contain '
f'required keys. {err}') from err
if param['name'] in s:
raise ValueError(f'''Duplicate parameter {param['name']} found.''')
else:
s.add(param['name'])
if 'description' not in param.keys():
param['description'] = 'No description provided.'

def validate_pipeline_structure(pipeline: list):
"""Verifies that the pipeline follows the correct
specification.
Args:
pipeline: Defines the components to use in the pipeline,
their order, and a mapping of component params to
pipeline params. A list of dictionaries, each dict
specifies a custom component and contains keys:
'component_name': name of the component
'param_mapping': a list of tuples mapping ->
(component_param, pipeline_param)
Raises:
Exception: If incorrect pipeline specification.
"""
components_list = get_components_list(full_path=False)
for component in pipeline:
try:
component_name = component['component_name']
if component_name not in components_list:
raise ValueError(f'Component {component_name} not found - '
f'No matching yaml definition in tmpfiles directory.')
param_mapping = component['param_mapping']
except KeyError as err:
raise ValueError(f'Component {component} does not contain '
f'required keys. {err}') from err
for param_tuple in param_mapping:
if not isinstance(param_tuple, tuple):
raise TypeError(f'Mapping contains a non-tuple '
f'element {param_tuple}')
elif len(param_tuple) != 2:
raise TypeError(f'Mapping must contain only 2 elements, '
f'tuple {param_tuple} is invalid.')
else:
for item in param_tuple:
if not isinstance(item, str):
raise TypeError(f'Mapping must be str-str, '
f'tuple {param_tuple} is invalid.')

def update_params(params: list) -> list:
"""Converts the parameter types from Python types
to Kubeflow types. Currently only supports
Expand Down Expand Up @@ -329,5 +252,30 @@ def update_params(params: list) -> list:
param['type'] = python_kfp_types_mapper[param['type']]
except KeyError as err:
raise ValueError(f'Unsupported python type - we only support '
f'primitive types at this time. {err}') from err
f'primitive types at this time. {err}') from err
return params

def get_function_source_definition(func: Callable) -> str:
"""Returns a formatted list of parameters.
Args:
func: The python function to create a component from. The function
should have type annotations for all its arguments, indicating how
it is intended to be used (e.g. as an input/output Artifact object,
a plain parameter, or a path to a file).
Returns:
str: The source code from the inputted function.
Raises:
Exception: If the preprocess operates failed.
"""
source_code = inspect.getsource(func)
source_code = textwrap.dedent(source_code)
source_code_lines = source_code.split('\n')
source_code_lines = itertools.dropwhile(lambda x: not x.startswith('def'),
source_code_lines)
if not source_code_lines:
raise ValueError(
f'Failed to dedent and clean up the source of function "{func.__name__}". '
f'It is probably not properly indented.')

return '\n'.join(source_code_lines)
2 changes: 1 addition & 1 deletion AutoMLOps/CloudRunBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def create_dockerfile(top_lvl_name: str):
"""
cloudrun_base = top_lvl_name + 'cloud_run/run_pipeline'
dockerfile = (BuilderUtils.LICENSE +
'FROM python:3.9\n'
'FROM python:3.9-slim\n'
'\n'
'# Allow statements and log messages to immediately appear in the Knative logs\n'
'ENV PYTHONUNBUFFERED True\n'
Expand Down
165 changes: 103 additions & 62 deletions AutoMLOps/ComponentBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
# pylint: disable=C0103
# pylint: disable=line-too-long

import inspect
from typing import Callable, List, Optional, TypeVar, Union

import docstring_parser

from AutoMLOps import BuilderUtils

T = TypeVar('T')

def formalize(component_path: str,
top_lvl_name: str,
defaults_file: str,
Expand Down Expand Up @@ -57,16 +64,19 @@ def create_task(component_spec: dict, task_filepath: str, use_kfp_spec: bool):
Raises:
Exception: If the imports tmpfile does not exist.
"""
if use_kfp_spec:
custom_imports = ''
custom_code = component_spec['implementation']['container']['command'][-1]
else:
custom_imports = BuilderUtils.read_file(BuilderUtils.IMPORTS_TMPFILE)
custom_code = component_spec['implementation']['container']['command']
custom_code = component_spec['implementation']['container']['command'][-1]
default_imports = (BuilderUtils.LICENSE +
'import argparse\n'
'import json\n'
'from kfp.v2.components import executor\n')
if not use_kfp_spec:
custom_imports = ('import kfp\n'
'from kfp.v2 import dsl\n'
'from kfp.v2.dsl import *\n'
'from typing import *\n'
'\n')
else:
custom_imports = '' # included as part of the kfp spec
main_func = (
'\n'
'''def main():\n'''
Expand Down Expand Up @@ -115,85 +125,116 @@ def create_component(component_spec: dict,
BuilderUtils.write_file(filename, BuilderUtils.LICENSE, 'w')
BuilderUtils.write_yaml_file(filename, component_spec, 'a')

def create_component_scaffold(name: str,
params: list,
description: str):
def create_component_scaffold(func: Optional[Callable] = None,
*,
packages_to_install: Optional[List[str]] = None):
"""Creates a tmp component scaffold which will be used by
the formalize function. Code is temporarily stored in
component_spec['implementation']['container']['command'].
Args:
name: Component name.
params: Component parameters. A list of dictionaries,
each param is a dict containing keys:
'name': required, str param name.
'type': required, python primitive type.
'description': optional, str param desc.
description: Optional description of the component.
func: The python function to create a component from. The function
should have type annotations for all its arguments, indicating how
it is intended to be used (e.g. as an input/output Artifact object,
a plain parameter, or a path to a file).
packages_to_install: A list of optional packages to install before
executing func. These will always be installed at component runtime.
"""
BuilderUtils.validate_name(name)
BuilderUtils.validate_params(params)
func_def = get_func_definition(name, params, description)
params = BuilderUtils.update_params(params)
code = BuilderUtils.read_file(BuilderUtils.CELL_TMPFILE)
code = filter_and_indent_cell(code)
BuilderUtils.delete_file(BuilderUtils.CELL_TMPFILE)
# Todo:
# Figure out what to do with package_to_install
name = func.__name__
parsed_docstring = docstring_parser.parse(inspect.getdoc(func))
description = parsed_docstring.short_description
# make yaml
component_spec = {}
component_spec['name'] = name
if description:
component_spec['description'] = description
component_spec['inputs'] = params
component_spec['inputs'] = get_function_parameters(func)
component_spec['implementation'] = {}
component_spec['implementation']['container'] = {}
component_spec['implementation']['container']['image'] = 'TBD'
component_spec['implementation']['container']['command'] = func_def + code
component_spec['implementation']['container']['command'] = get_packages_to_install_command(func, packages_to_install)
component_spec['implementation']['container']['args'] = ['--executor_input',
{'executorInput': None}, '--function_to_execute', name]
filename = BuilderUtils.TMPFILES_DIR + f'/{name}.yaml'
BuilderUtils.make_dirs([BuilderUtils.TMPFILES_DIR]) # if it doesn't already exist
BuilderUtils.write_yaml_file(filename, component_spec, 'w')

def get_func_definition(name: str,
params: list,
description: str):
"""Generates a python function definition to be used in
the {component_name}.py file (this file will contain
Jupyter cell code).
def get_packages_to_install_command(func: Optional[Callable] = None,
packages_to_install: Optional[List[str]] = None):
"""Returns a list of formatted list of commands, including code for tmp storage.
Args:
name: Component name.
params: Component parameters. A list of dictionaries,
each param is a dict containing keys:
'name': required, str param name.
'type': required, python primitive type.
'description': optional, str param desc.
description: Optional description of the component.
func: The python function to create a component from. The function
should have type annotations for all its arguments, indicating how
it is intended to be used (e.g. as an input/output Artifact object,
a plain parameter, or a path to a file).
packages_to_install: A list of optional packages to install before
executing func. These will always be installed at component runtime.
"""
newline = '\n'
return (
f'\n'
f'def {name}(\n'
f'''{newline.join(f" {param['name']}: {param['type'].__name__}," for param in params)}\n'''
f'):\n'
f' """{description}\n'
f'\n'
f' Args:\n'
f'''{newline.join(f" {param['name']}: {param['description']}," for param in params)}\n'''
f' """'
)

def filter_and_indent_cell(code: str) -> str:
"""Remove unwanted makeComponent function call
and indent cell code.
if not packages_to_install:
packages_to_install = []
concat_package_list = ' '.join(
[repr(str(package)) for package in packages_to_install])
# pylint: disable=anomalous-backslash-in-string
install_python_packages_script = (
f'''if ! [ -x "$(command -v pip)" ]; then{newline}'''
f''' python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip{newline}'''
f'''fi{newline}'''
f'''PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet \{newline}'''
f''' --no-warn-script-location {concat_package_list} && "$0" "$@"{newline}'''
f'''{newline}''')
src_code = BuilderUtils.get_function_source_definition(func)
return ['sh', '-c', install_python_packages_script, src_code]

def get_function_parameters(func: Callable) -> dict:
"""Returns a formatted list of parameters.
Args:
func: The python function to create a component from. The function
should have type annotations for all its arguments, indicating how
it is intended to be used (e.g. as an input/output Artifact object,
a plain parameter, or a path to a file).
Returns:
list: Params list with types converted to kubeflow spec.
Raises:
Exception: If parameter type hints are not provided.
"""
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
parsed_docstring = docstring_parser.parse(inspect.getdoc(func))
doc_dict = {p.arg_name: p.description for p in parsed_docstring.params}

parameter_holder = []
for param in parameters:
metadata = {}
metadata['name'] = param.name
metadata['description'] = doc_dict.get(param.name)
metadata['type'] = maybe_strip_optional_from_annotation(
param.annotation)
parameter_holder.append(metadata)
# pylint: disable=protected-access
if metadata['type'] == inspect._empty:
raise TypeError(
f'''Missing type hint for parameter "{metadata['name']}". '''
f'''Please specify the type for this parameter.''')
return BuilderUtils.update_params(parameter_holder)

def maybe_strip_optional_from_annotation(annotation: T) -> T:
"""Strips 'Optional' from 'Optional[<type>]' if applicable.
For example::
Optional[str] -> str
str -> str
List[int] -> List[int]
Args:
code: String contains the contents of the
Jupyter cell.
Return:
str: Indented cell code with removed func call.
annotation: The original type annotation which may or may not has
`Optional`.
Returns:
The type inside Optional[] if Optional exists, otherwise the original type.
"""
code = code.replace(code[code.find('AutoMLOps.makeComponent('):code.find(')')+1], '')
indented_code = ''
for line in code.splitlines():
indented_code += ' ' + line + '\n'
return indented_code
if getattr(annotation, '__origin__',
None) is Union and annotation.__args__[1] is type(None):
return annotation.__args__[0]
return annotation
Loading

0 comments on commit eb63e8e

Please sign in to comment.