From 2923dd43af3c481db93643815dfb6e576a7ddee2 Mon Sep 17 00:00:00 2001 From: Allegra Noto Date: Thu, 15 Aug 2024 10:28:44 -0400 Subject: [PATCH 1/5] Added initial defaults objects in first step to oop --- google_cloud_automlops/orchestration/base.py | 180 +++++++++++++++---- google_cloud_automlops/orchestration/kfp.py | 51 +++--- google_cloud_automlops/utils/enums.py | 73 +++++++- 3 files changed, 234 insertions(+), 70 deletions(-) diff --git a/google_cloud_automlops/orchestration/base.py b/google_cloud_automlops/orchestration/base.py index f7aef40..f6b47b6 100644 --- a/google_cloud_automlops/orchestration/base.py +++ b/google_cloud_automlops/orchestration/base.py @@ -34,6 +34,14 @@ DEFAULT_PIPELINE_NAME, GENERATED_DEFAULTS_FILE ) +from google_cloud_automlops.utils.enums import ( + GCP, + PipelineSpecs, + Tooling, + Defaults, + GCPLocations, + Metadata +) T = TypeVar('T') @@ -78,10 +86,7 @@ def __init__(self, self.src_code = get_function_source_definition(self.func) # Instantiate attributes to be set during build - self.artifact_repo_location = None - self.artifact_repo_name = None - self.project_id = None - self.naming_prefix = None + self.defaults = None def build(self): """Instantiates an abstract built method to create and write task files. Also reads in @@ -92,12 +97,50 @@ def build(self): """ defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.artifact_repo_location = defaults['gcp']['artifact_repo_location'] - self.artifact_repo_name = defaults['gcp']['artifact_repo_name'] - self.project_id = defaults['gcp']['project_id'] - self.naming_prefix = defaults['gcp']['naming_prefix'] - - raise NotImplementedError('Subclass needs to define this.') + gcp = GCP( + artifact_repo_location = GCPLocations(defaults["gcp"]["artifact_repo_location"]), + artifact_repo_name = defaults["gcp"]["artifact_repo_name"], + artifact_repo_type = defaults["gcp"]["artifact_repo_type"], + base_image = defaults["gcp"]["base_image"], + build_trigger_location = defaults["gcp"]["build_trigger_location"], + build_trigger_name = defaults["gcp"]["build_trigger_name"], + naming_prefix = defaults["gcp"]["naming_prefix"], + pipeline_job_runner_service_account = defaults["gcp"]["pipeline_job_runner_service_account"], + pipeline_job_submission_service_location = defaults["gcp"]["pipeline_job_submission_service_location"], + pipeline_job_submission_service_name = defaults["gcp"]["pipeline_job_submission_service_name"], + pipeline_job_submission_service_type = defaults["gcp"]["pipeline_job_submission_service_type"], + project_id = defaults["gcp"]["project_id"], + setup_model_monitoring = defaults["gcp"]["setup_model_monitoring"], + pubsub_topic_name = defaults["gcp"]["pubsub_topic_name"], + schedule_location = defaults["gcp"]["schedule_location"], + schedule_name = defaults["gcp"]["schedule_name"], + schedule_pattern = defaults["gcp"]["schedule_pattern"], + source_repository_branch = defaults["gcp"]["source_repository_branch"], + source_repository_name = defaults["gcp"]["source_repository_name"], + source_repository_type = defaults["gcp"]["source_repository_type"], + storage_bucket_location = defaults["gcp"]["storage_bucket_location"], + storage_bucket_name = defaults["gcp"]["storage_bucket_name"], + vpc_connector = defaults["gcp"]["vpc_connector"], + ) + pipeline_specs = PipelineSpecs( + gs_pipeline_job_spec_path = defaults["pipelines"]["gs_pipeline_job_spec_path"], + parameter_values_path = defaults["pipelines"]["parameter_values_path"], + pipeline_component_directory = defaults["pipelines"]["pipeline_component_directory"], + pipeline_job_spec_path = defaults["pipelines"]["pipeline_job_spec_path"], + pipeline_region = defaults["pipelines"]["pipeline_region"], + pipeline_storage_path = defaults["pipelines"]["pipeline_storage_path"], + ) + tooling = Tooling( + deployment_framework = defaults["tooling"]["deployment_framework"], + provisioning_framework = defaults["tooling"]["provisioning_framework"], + orchestration_framework = defaults["tooling"]["orchestration_framework"], + use_ci = defaults["tooling"]["use_ci"], + ) + self.defaults = Defaults( + gcp = gcp, + pipeline_specs = pipeline_specs, + tooling = tooling + ) def _get_function_return_types(self) -> list: """Returns a formatted list of function return types. @@ -216,14 +259,9 @@ def __init__(self, self.comps = self.get_pipeline_components(func, comps_dict) # Instantiate attributes to be set at build process - self.base_image = None + self.defaults = None self.custom_training_job_specs = None self.pipeline_params = None - self.pubsub_topic_name = None - self.use_ci = None - self.project_id = None - self.gs_pipeline_job_spec_path = None - self.setup_model_monitoring = None def build(self, pipeline_params: dict, @@ -250,14 +288,50 @@ def build(self, # Extract additional attributes from defaults file defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.project_id = defaults['gcp']['project_id'] - self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'] - self.base_image = defaults['gcp']['base_image'] - self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] - self.use_ci = defaults['tooling']['use_ci'] - self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] - - raise NotImplementedError('Subclass needs to define this.') + gcp = GCP( + artifact_repo_location = GCPLocations(defaults["gcp"]["artifact_repo_location"]), + artifact_repo_name = defaults["gcp"]["artifact_repo_name"], + artifact_repo_type = defaults["gcp"]["artifact_repo_type"], + base_image = defaults["gcp"]["base_image"], + build_trigger_location = defaults["gcp"]["build_trigger_location"], + build_trigger_name = defaults["gcp"]["build_trigger_name"], + naming_prefix = defaults["gcp"]["naming_prefix"], + pipeline_job_runner_service_account = defaults["gcp"]["pipeline_job_runner_service_account"], + pipeline_job_submission_service_location = defaults["gcp"]["pipeline_job_submission_service_location"], + pipeline_job_submission_service_name = defaults["gcp"]["pipeline_job_submission_service_name"], + pipeline_job_submission_service_type = defaults["gcp"]["pipeline_job_submission_service_type"], + project_id = defaults["gcp"]["project_id"], + setup_model_monitoring = defaults["gcp"]["setup_model_monitoring"], + pubsub_topic_name = defaults["gcp"]["pubsub_topic_name"], + schedule_location = defaults["gcp"]["schedule_location"], + schedule_name = defaults["gcp"]["schedule_name"], + schedule_pattern = defaults["gcp"]["schedule_pattern"], + source_repository_branch = defaults["gcp"]["source_repository_branch"], + source_repository_name = defaults["gcp"]["source_repository_name"], + source_repository_type = defaults["gcp"]["source_repository_type"], + storage_bucket_location = defaults["gcp"]["storage_bucket_location"], + storage_bucket_name = defaults["gcp"]["storage_bucket_name"], + vpc_connector = defaults["gcp"]["vpc_connector"], + ) + pipeline_specs = PipelineSpecs( + gs_pipeline_job_spec_path = defaults["pipelines"]["gs_pipeline_job_spec_path"], + parameter_values_path = defaults["pipelines"]["parameter_values_path"], + pipeline_component_directory = defaults["pipelines"]["pipeline_component_directory"], + pipeline_job_spec_path = defaults["pipelines"]["pipeline_job_spec_path"], + pipeline_region = defaults["pipelines"]["pipeline_region"], + pipeline_storage_path = defaults["pipelines"]["pipeline_storage_path"], + ) + tooling = Tooling( + deployment_framework = defaults["tooling"]["deployment_framework"], + provisioning_framework = defaults["tooling"]["provisioning_framework"], + orchestration_framework = defaults["tooling"]["orchestration_framework"], + use_ci = defaults["tooling"]["use_ci"], + ) + self.defaults = Defaults( + gcp = gcp, + pipeline_specs = pipeline_specs, + tooling = tooling + ) def get_pipeline_components(self, pipeline_func: Callable, @@ -301,12 +375,7 @@ class BaseServices(): def __init__(self) -> None: """Instantiates a generic Services object. """ - self.pipeline_storage_path = None - self.pipeline_job_runner_service_account = None - self.pipeline_job_submission_service_type = None - self.project_id = None - self.pipeline_job_submission_service_type = None - self.setup_model_monitoring = None + self.defaults = None # Set directory for files to be written to self.submission_service_base_dir = BASE_DIR + 'services/submission_service' @@ -326,11 +395,50 @@ def build(self): """ # Extract additional attributes from defaults file defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.pipeline_storage_path = defaults['pipelines']['pipeline_storage_path'] - self.pipeline_job_runner_service_account = defaults['gcp']['pipeline_job_runner_service_account'] - self.pipeline_job_submission_service_type = defaults['gcp']['pipeline_job_submission_service_type'] - self.project_id = defaults['gcp']['project_id'] - self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] + gcp = GCP( + artifact_repo_location = GCPLocations(defaults["gcp"]["artifact_repo_location"]), + artifact_repo_name = defaults["gcp"]["artifact_repo_name"], + artifact_repo_type = defaults["gcp"]["artifact_repo_type"], + base_image = defaults["gcp"]["base_image"], + build_trigger_location = defaults["gcp"]["build_trigger_location"], + build_trigger_name = defaults["gcp"]["build_trigger_name"], + naming_prefix = defaults["gcp"]["naming_prefix"], + pipeline_job_runner_service_account = defaults["gcp"]["pipeline_job_runner_service_account"], + pipeline_job_submission_service_location = defaults["gcp"]["pipeline_job_submission_service_location"], + pipeline_job_submission_service_name = defaults["gcp"]["pipeline_job_submission_service_name"], + pipeline_job_submission_service_type = defaults["gcp"]["pipeline_job_submission_service_type"], + project_id = defaults["gcp"]["project_id"], + setup_model_monitoring = defaults["gcp"]["setup_model_monitoring"], + pubsub_topic_name = defaults["gcp"]["pubsub_topic_name"], + schedule_location = defaults["gcp"]["schedule_location"], + schedule_name = defaults["gcp"]["schedule_name"], + schedule_pattern = defaults["gcp"]["schedule_pattern"], + source_repository_branch = defaults["gcp"]["source_repository_branch"], + source_repository_name = defaults["gcp"]["source_repository_name"], + source_repository_type = defaults["gcp"]["source_repository_type"], + storage_bucket_location = defaults["gcp"]["storage_bucket_location"], + storage_bucket_name = defaults["gcp"]["storage_bucket_name"], + vpc_connector = defaults["gcp"]["vpc_connector"], + ) + pipeline_specs = PipelineSpecs( + gs_pipeline_job_spec_path = defaults["pipelines"]["gs_pipeline_job_spec_path"], + parameter_values_path = defaults["pipelines"]["parameter_values_path"], + pipeline_component_directory = defaults["pipelines"]["pipeline_component_directory"], + pipeline_job_spec_path = defaults["pipelines"]["pipeline_job_spec_path"], + pipeline_region = defaults["pipelines"]["pipeline_region"], + pipeline_storage_path = defaults["pipelines"]["pipeline_storage_path"], + ) + tooling = Tooling( + deployment_framework = defaults["tooling"]["deployment_framework"], + provisioning_framework = defaults["tooling"]["provisioning_framework"], + orchestration_framework = defaults["tooling"]["orchestration_framework"], + use_ci = defaults["tooling"]["use_ci"], + ) + self.defaults = Defaults( + gcp = gcp, + pipeline_specs = pipeline_specs, + tooling = tooling + ) # Set directory for files to be written to self.submission_service_base_dir = BASE_DIR + 'services/submission_service' @@ -339,7 +447,7 @@ def build(self): self._build_submission_services() # Setup model monitoring - if self.setup_model_monitoring: + if self.defaults.gcp.setup_model_monitoring: self._build_monitoring() def _build_monitoring(self): diff --git a/google_cloud_automlops/orchestration/kfp.py b/google_cloud_automlops/orchestration/kfp.py index 59e76ae..c01c3df 100644 --- a/google_cloud_automlops/orchestration/kfp.py +++ b/google_cloud_automlops/orchestration/kfp.py @@ -74,11 +74,7 @@ class KFPComponent(BaseComponent): def build(self): """Constructs files for running and managing Kubeflow pipelines. """ - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.artifact_repo_location = defaults['gcp']['artifact_repo_location'] - self.artifact_repo_name = defaults['gcp']['artifact_repo_name'] - self.project_id = defaults['gcp']['project_id'] - self.naming_prefix = defaults['gcp']['naming_prefix'] + super().build() # Set and create directory for components if it does not already exist component_dir = BASE_DIR + 'components/' + self.name @@ -91,10 +87,10 @@ def build(self): BASE_DIR + 'components/component_base/src/']) compspec_image = ( - f'''{self.artifact_repo_location}-docker.pkg.dev/''' - f'''{self.project_id}/''' - f'''{self.artifact_repo_name}/''' - f'''{self.naming_prefix}/''' + f'''{self.defaults.gcp.artifact_repo_location.value}-docker.pkg.dev/''' + f'''{self.defaults.gcp.project_id}/''' + f'''{self.defaults.gcp.artifact_repo_name}/''' + f'''{self.defaults.gcp.naming_prefix}/''' f'''components/component_base:latest''') # Write component spec @@ -192,19 +188,12 @@ def build(self, to None. """ + super().build(pipeline_params, custom_training_job_specs) + # Save parameters as attributes self.custom_training_job_specs = custom_training_job_specs self.pipeline_params = pipeline_params - # Extract additional attributes from defaults file - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.project_id = defaults['gcp']['project_id'] - self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'] - self.base_image = defaults['gcp']['base_image'] - self.use_ci = defaults['tooling']['use_ci'] - self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] if self.use_ci else None - self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] - # Build necessary folders make_dirs([ f'{BASE_DIR}scripts/pipeline_spec/', @@ -217,8 +206,8 @@ def build(self, filepath=f'{BASE_DIR}README.md', text=render_jinja( template_path=import_files(KFP_TEMPLATES_PATH) / 'README.md.j2', - setup_model_monitoring=self.setup_model_monitoring, - use_ci=self.use_ci), + setup_model_monitoring=self.defaults.gcp.setup_model_monitoring, + use_ci=self.defaults.tooling.use_ci), mode='w') # components/component_base/dockerfile: Write the component base Dockerfile @@ -226,7 +215,7 @@ def build(self, filepath=f'{GENERATED_COMPONENT_BASE}/Dockerfile', text=render_jinja( template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base') / 'Dockerfile.j2', - base_image=self.base_image, + base_image=self.defaults.gcp.base_image, generated_license=GENERATED_LICENSE), mode='w') @@ -278,7 +267,7 @@ def build(self, base_dir=BASE_DIR)) # scripts/publish_to_topic.sh: If using CI, write script for publishing to pubsub topic - if self.use_ci: + if self.defaults.tooling.use_ci: write_and_chmod( filepath=GENERATED_PUBLISH_TO_TOPIC_FILE, text=render_jinja( @@ -286,7 +275,7 @@ def build(self, base_dir=BASE_DIR, generated_license=GENERATED_LICENSE, generated_parameter_values_path=GENERATED_PARAMETER_VALUES_PATH, - pubsub_topic_name=self.pubsub_topic_name)) + pubsub_topic_name=self.defaults.gcp.pubsub_topic_name)) # pipelines/pipeline.py: Generates a Kubeflow pipeline spec from custom components. components_list = self._get_component_list() @@ -299,7 +288,7 @@ def build(self, custom_training_job_specs=self.custom_training_job_specs, generated_license=GENERATED_LICENSE, pipeline_scaffold_contents=pipeline_scaffold_contents, - project_id=self.project_id), + project_id=self.defaults.gcp.project_id), mode='w') # pipelines/pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec. @@ -319,7 +308,7 @@ def build(self, mode='w') # pipelines/runtime_parameters/pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob. - self.pipeline_params['gs_pipeline_spec_path'] = self.gs_pipeline_job_spec_path + self.pipeline_params['gs_pipeline_spec_path'] = self.defaults.gcp.gs_pipeline_job_spec_path serialized_params = json.dumps(self.pipeline_params, indent=4) write_file(BASE_DIR + GENERATED_PARAMETER_VALUES_PATH, serialized_params, 'w') @@ -483,7 +472,7 @@ def _build_submission_services(self): render_jinja( template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'requirements.txt.j2', pinned_kfp_version=PINNED_KFP_VERSION, - pipeline_job_submission_service_type=self.pipeline_job_submission_service_type), + pipeline_job_submission_service_type=self.defaults.gcp.pipeline_job_submission_service_type), 'w') write_file( @@ -491,11 +480,11 @@ def _build_submission_services(self): render_jinja( template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'main.py.j2', generated_license=GENERATED_LICENSE, - pipeline_root=self.pipeline_storage_path, - pipeline_job_runner_service_account=self.pipeline_job_runner_service_account, - pipeline_job_submission_service_type=self.pipeline_job_submission_service_type, - project_id=self.project_id, - setup_model_monitoring=self.setup_model_monitoring), + pipeline_root=self.defaults.gcp.pipeline_storage_path, + pipeline_job_runner_service_account=self.defaults.gcp.pipeline_job_runner_service_account, + pipeline_job_submission_service_type=self.defaults.gcp.pipeline_job_submission_service_type, + project_id=self.defaults.gcp.project_id, + setup_model_monitoring=self.defaults.gcp.setup_model_monitoring), 'w') write_file( diff --git a/google_cloud_automlops/utils/enums.py b/google_cloud_automlops/utils/enums.py index 033988a..462ec22 100644 --- a/google_cloud_automlops/utils/enums.py +++ b/google_cloud_automlops/utils/enums.py @@ -20,6 +20,7 @@ # pylint: disable=line-too-long from enum import Enum +from pydantic import BaseModel class Deployer(Enum): @@ -58,10 +59,10 @@ class ArtifactRepository(Enum): class CodeRepository(Enum): """Enum representing the available options for source code repositories.""" - BITBUCKET = 'bitbucket' # roadmap item + # BITBUCKET = 'bitbucket' # roadmap item CLOUD_SOURCE_REPOSITORIES = 'cloud-source-repositories' - GITHUB = 'github' - GITLAB = 'gitlab' # roadmap item + # GITHUB = 'github' + # GITLAB = 'gitlab' # roadmap item class PipelineJobSubmitter(Enum): @@ -77,3 +78,69 @@ class PulumiRuntime(Enum): PYTHON = 'python' TYPESCRIPT = 'typescript' GO = 'go' + + +class GCPLocations(Enum): + US_CENTRAL_1 = "us-central1" + US_EAST_1 = "us-east1" + US_EAST_4 = "us-east4" + US_EAST_5 = "us-east5" + US_WEST_1 = "us-west1" + US_WEST_2 = "us-west2" + US_WEST_3 = "us-west3" + US_WEST_4 = "us-west4" + US_SOUTH_1 = "us-south1" + + +class Metadata(BaseModel): + name: str + type: str + description: str + + +class GCP(BaseModel): + artifact_repo_location: GCPLocations + artifact_repo_name: str + artifact_repo_type: str + base_image: str + build_trigger_location: str + build_trigger_name: str + naming_prefix: str + pipeline_job_runner_service_account: str + pipeline_job_submission_service_location: str + pipeline_job_submission_service_name: str + pipeline_job_submission_service_type: str + project_id: str + setup_model_monitoring: bool + pubsub_topic_name: str + schedule_location: str + schedule_name: str + schedule_pattern: str + source_repository_branch: str + source_repository_name: str + source_repository_type: str + storage_bucket_location: str + storage_bucket_name: str + vpc_connector: str + + +class PipelineSpecs(BaseModel): + gs_pipeline_job_spec_path: str + parameter_values_path: str + pipeline_component_directory: str + pipeline_job_spec_path: str + pipeline_region: str + pipeline_storage_path: str + + +class Tooling(BaseModel): + deployment_framework: Deployer + provisioning_framework: Provisioner + orchestration_framework: Orchestrator + use_ci: bool + + +class Defaults(BaseModel): + gcp: GCP + pipeline_specs: PipelineSpecs + tooling: Tooling \ No newline at end of file From fd291ee20b29c97eb3596c03291f3fe72574ec33 Mon Sep 17 00:00:00 2001 From: Allegra Noto Date: Thu, 15 Aug 2024 12:35:09 -0400 Subject: [PATCH 2/5] Added metadata as a parameter object type --- google_cloud_automlops/orchestration/base.py | 32 +++++++++++--------- google_cloud_automlops/utils/enums.py | 2 +- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/google_cloud_automlops/orchestration/base.py b/google_cloud_automlops/orchestration/base.py index f6b47b6..360ee94 100644 --- a/google_cloud_automlops/orchestration/base.py +++ b/google_cloud_automlops/orchestration/base.py @@ -40,7 +40,7 @@ Tooling, Defaults, GCPLocations, - Metadata + Parameter ) T = TypeVar('T') @@ -167,14 +167,15 @@ def _get_function_return_types(self) -> list: if not (hasattr(annotation,'__annotations__') and isinstance(annotation.__annotations__, dict)): raise TypeError(f'''Return type hint for function "{self.name}" must be a NamedTuple.''') - # Creates a dictionary of metadata for each object returned by component + # Creates a parameter object for each parameter returned by component outputs = [] for name, type_ in annotation.__annotations__.items(): - metadata = {} - metadata['name'] = name - metadata['type'] = type_ - metadata['description'] = None - outputs.append(metadata) + p = Parameter( + name=name, + type=type_, + description=None + ) + outputs.append(p) return outputs def _get_function_parameters(self) -> list: @@ -195,16 +196,17 @@ def _get_function_parameters(self) -> list: # Extract parameter metadata parameter_holder = [] for param in parameters: - metadata = {} - metadata['name'] = param.name - metadata['description'] = doc_dict.get(param.name) - metadata['type'] = self.maybe_strip_optional_from_annotation( - param.annotation) - parameter_holder.append(metadata) + p = Parameter( + name=param.name, + type=self.maybe_strip_optional_from_annotation( + param.annotation), + description=doc_dict.get(param.name) + ) + parameter_holder.append(p) # pylint: disable=protected-access - if metadata['type'] == inspect._empty: + if p.type == inspect._empty: raise TypeError( - f'''Missing type hint for parameter "{metadata['name']}". ''' + f'''Missing type hint for parameter "{p.name}". ''' f'''Please specify the type for this parameter.''') return parameter_holder diff --git a/google_cloud_automlops/utils/enums.py b/google_cloud_automlops/utils/enums.py index 462ec22..67d3d8b 100644 --- a/google_cloud_automlops/utils/enums.py +++ b/google_cloud_automlops/utils/enums.py @@ -92,7 +92,7 @@ class GCPLocations(Enum): US_SOUTH_1 = "us-south1" -class Metadata(BaseModel): +class Parameter(BaseModel): name: str type: str description: str From 106000c32d1d10167d817e0fbde96d08bb3add7c Mon Sep 17 00:00:00 2001 From: Allegra Noto Date: Mon, 19 Aug 2024 11:25:35 -0400 Subject: [PATCH 3/5] created get defaults function --- google_cloud_automlops/orchestration/base.py | 151 +------------------ google_cloud_automlops/utils/enums.py | 24 +-- google_cloud_automlops/utils/utils.py | 54 ++++++- 3 files changed, 72 insertions(+), 157 deletions(-) diff --git a/google_cloud_automlops/orchestration/base.py b/google_cloud_automlops/orchestration/base.py index 360ee94..0a1f8b1 100644 --- a/google_cloud_automlops/orchestration/base.py +++ b/google_cloud_automlops/orchestration/base.py @@ -26,22 +26,14 @@ import docstring_parser from google_cloud_automlops.utils.utils import ( + get_defaults, get_function_source_definition, - read_yaml_file ) from google_cloud_automlops.utils.constants import ( BASE_DIR, DEFAULT_PIPELINE_NAME, - GENERATED_DEFAULTS_FILE -) -from google_cloud_automlops.utils.enums import ( - GCP, - PipelineSpecs, - Tooling, - Defaults, - GCPLocations, - Parameter ) +from google_cloud_automlops.utils.enums import Parameter T = TypeVar('T') @@ -95,52 +87,7 @@ def build(self): Raises: NotImplementedError: The subclass has not defined the `build` method. """ - - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - gcp = GCP( - artifact_repo_location = GCPLocations(defaults["gcp"]["artifact_repo_location"]), - artifact_repo_name = defaults["gcp"]["artifact_repo_name"], - artifact_repo_type = defaults["gcp"]["artifact_repo_type"], - base_image = defaults["gcp"]["base_image"], - build_trigger_location = defaults["gcp"]["build_trigger_location"], - build_trigger_name = defaults["gcp"]["build_trigger_name"], - naming_prefix = defaults["gcp"]["naming_prefix"], - pipeline_job_runner_service_account = defaults["gcp"]["pipeline_job_runner_service_account"], - pipeline_job_submission_service_location = defaults["gcp"]["pipeline_job_submission_service_location"], - pipeline_job_submission_service_name = defaults["gcp"]["pipeline_job_submission_service_name"], - pipeline_job_submission_service_type = defaults["gcp"]["pipeline_job_submission_service_type"], - project_id = defaults["gcp"]["project_id"], - setup_model_monitoring = defaults["gcp"]["setup_model_monitoring"], - pubsub_topic_name = defaults["gcp"]["pubsub_topic_name"], - schedule_location = defaults["gcp"]["schedule_location"], - schedule_name = defaults["gcp"]["schedule_name"], - schedule_pattern = defaults["gcp"]["schedule_pattern"], - source_repository_branch = defaults["gcp"]["source_repository_branch"], - source_repository_name = defaults["gcp"]["source_repository_name"], - source_repository_type = defaults["gcp"]["source_repository_type"], - storage_bucket_location = defaults["gcp"]["storage_bucket_location"], - storage_bucket_name = defaults["gcp"]["storage_bucket_name"], - vpc_connector = defaults["gcp"]["vpc_connector"], - ) - pipeline_specs = PipelineSpecs( - gs_pipeline_job_spec_path = defaults["pipelines"]["gs_pipeline_job_spec_path"], - parameter_values_path = defaults["pipelines"]["parameter_values_path"], - pipeline_component_directory = defaults["pipelines"]["pipeline_component_directory"], - pipeline_job_spec_path = defaults["pipelines"]["pipeline_job_spec_path"], - pipeline_region = defaults["pipelines"]["pipeline_region"], - pipeline_storage_path = defaults["pipelines"]["pipeline_storage_path"], - ) - tooling = Tooling( - deployment_framework = defaults["tooling"]["deployment_framework"], - provisioning_framework = defaults["tooling"]["provisioning_framework"], - orchestration_framework = defaults["tooling"]["orchestration_framework"], - use_ci = defaults["tooling"]["use_ci"], - ) - self.defaults = Defaults( - gcp = gcp, - pipeline_specs = pipeline_specs, - tooling = tooling - ) + self.defaults = get_defaults() def _get_function_return_types(self) -> list: """Returns a formatted list of function return types. @@ -289,51 +236,7 @@ def build(self, self.pipeline_params = pipeline_params # Extract additional attributes from defaults file - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - gcp = GCP( - artifact_repo_location = GCPLocations(defaults["gcp"]["artifact_repo_location"]), - artifact_repo_name = defaults["gcp"]["artifact_repo_name"], - artifact_repo_type = defaults["gcp"]["artifact_repo_type"], - base_image = defaults["gcp"]["base_image"], - build_trigger_location = defaults["gcp"]["build_trigger_location"], - build_trigger_name = defaults["gcp"]["build_trigger_name"], - naming_prefix = defaults["gcp"]["naming_prefix"], - pipeline_job_runner_service_account = defaults["gcp"]["pipeline_job_runner_service_account"], - pipeline_job_submission_service_location = defaults["gcp"]["pipeline_job_submission_service_location"], - pipeline_job_submission_service_name = defaults["gcp"]["pipeline_job_submission_service_name"], - pipeline_job_submission_service_type = defaults["gcp"]["pipeline_job_submission_service_type"], - project_id = defaults["gcp"]["project_id"], - setup_model_monitoring = defaults["gcp"]["setup_model_monitoring"], - pubsub_topic_name = defaults["gcp"]["pubsub_topic_name"], - schedule_location = defaults["gcp"]["schedule_location"], - schedule_name = defaults["gcp"]["schedule_name"], - schedule_pattern = defaults["gcp"]["schedule_pattern"], - source_repository_branch = defaults["gcp"]["source_repository_branch"], - source_repository_name = defaults["gcp"]["source_repository_name"], - source_repository_type = defaults["gcp"]["source_repository_type"], - storage_bucket_location = defaults["gcp"]["storage_bucket_location"], - storage_bucket_name = defaults["gcp"]["storage_bucket_name"], - vpc_connector = defaults["gcp"]["vpc_connector"], - ) - pipeline_specs = PipelineSpecs( - gs_pipeline_job_spec_path = defaults["pipelines"]["gs_pipeline_job_spec_path"], - parameter_values_path = defaults["pipelines"]["parameter_values_path"], - pipeline_component_directory = defaults["pipelines"]["pipeline_component_directory"], - pipeline_job_spec_path = defaults["pipelines"]["pipeline_job_spec_path"], - pipeline_region = defaults["pipelines"]["pipeline_region"], - pipeline_storage_path = defaults["pipelines"]["pipeline_storage_path"], - ) - tooling = Tooling( - deployment_framework = defaults["tooling"]["deployment_framework"], - provisioning_framework = defaults["tooling"]["provisioning_framework"], - orchestration_framework = defaults["tooling"]["orchestration_framework"], - use_ci = defaults["tooling"]["use_ci"], - ) - self.defaults = Defaults( - gcp = gcp, - pipeline_specs = pipeline_specs, - tooling = tooling - ) + self.defaults = get_defaults() def get_pipeline_components(self, pipeline_func: Callable, @@ -396,51 +299,7 @@ def build(self): requirements.txt """ # Extract additional attributes from defaults file - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - gcp = GCP( - artifact_repo_location = GCPLocations(defaults["gcp"]["artifact_repo_location"]), - artifact_repo_name = defaults["gcp"]["artifact_repo_name"], - artifact_repo_type = defaults["gcp"]["artifact_repo_type"], - base_image = defaults["gcp"]["base_image"], - build_trigger_location = defaults["gcp"]["build_trigger_location"], - build_trigger_name = defaults["gcp"]["build_trigger_name"], - naming_prefix = defaults["gcp"]["naming_prefix"], - pipeline_job_runner_service_account = defaults["gcp"]["pipeline_job_runner_service_account"], - pipeline_job_submission_service_location = defaults["gcp"]["pipeline_job_submission_service_location"], - pipeline_job_submission_service_name = defaults["gcp"]["pipeline_job_submission_service_name"], - pipeline_job_submission_service_type = defaults["gcp"]["pipeline_job_submission_service_type"], - project_id = defaults["gcp"]["project_id"], - setup_model_monitoring = defaults["gcp"]["setup_model_monitoring"], - pubsub_topic_name = defaults["gcp"]["pubsub_topic_name"], - schedule_location = defaults["gcp"]["schedule_location"], - schedule_name = defaults["gcp"]["schedule_name"], - schedule_pattern = defaults["gcp"]["schedule_pattern"], - source_repository_branch = defaults["gcp"]["source_repository_branch"], - source_repository_name = defaults["gcp"]["source_repository_name"], - source_repository_type = defaults["gcp"]["source_repository_type"], - storage_bucket_location = defaults["gcp"]["storage_bucket_location"], - storage_bucket_name = defaults["gcp"]["storage_bucket_name"], - vpc_connector = defaults["gcp"]["vpc_connector"], - ) - pipeline_specs = PipelineSpecs( - gs_pipeline_job_spec_path = defaults["pipelines"]["gs_pipeline_job_spec_path"], - parameter_values_path = defaults["pipelines"]["parameter_values_path"], - pipeline_component_directory = defaults["pipelines"]["pipeline_component_directory"], - pipeline_job_spec_path = defaults["pipelines"]["pipeline_job_spec_path"], - pipeline_region = defaults["pipelines"]["pipeline_region"], - pipeline_storage_path = defaults["pipelines"]["pipeline_storage_path"], - ) - tooling = Tooling( - deployment_framework = defaults["tooling"]["deployment_framework"], - provisioning_framework = defaults["tooling"]["provisioning_framework"], - orchestration_framework = defaults["tooling"]["orchestration_framework"], - use_ci = defaults["tooling"]["use_ci"], - ) - self.defaults = Defaults( - gcp = gcp, - pipeline_specs = pipeline_specs, - tooling = tooling - ) + self.defaults = get_defaults() # Set directory for files to be written to self.submission_service_base_dir = BASE_DIR + 'services/submission_service' diff --git a/google_cloud_automlops/utils/enums.py b/google_cloud_automlops/utils/enums.py index 67d3d8b..d61d65f 100644 --- a/google_cloud_automlops/utils/enums.py +++ b/google_cloud_automlops/utils/enums.py @@ -81,15 +81,17 @@ class PulumiRuntime(Enum): class GCPLocations(Enum): - US_CENTRAL_1 = "us-central1" - US_EAST_1 = "us-east1" - US_EAST_4 = "us-east4" - US_EAST_5 = "us-east5" - US_WEST_1 = "us-west1" - US_WEST_2 = "us-west2" - US_WEST_3 = "us-west3" - US_WEST_4 = "us-west4" - US_SOUTH_1 = "us-south1" + """Enum representing the available GCP locations. + """ + US_CENTRAL_1 = 'us-central1' + US_EAST_1 = 'us-east1' + US_EAST_4 = 'us-east4' + US_EAST_5 = 'us-east5' + US_WEST_1 = 'us-west1' + US_WEST_2 = 'us-west2' + US_WEST_3 = 'us-west3' + US_WEST_4 = 'us-west4' + US_SOUTH_1 = 'us-south1' class Parameter(BaseModel): @@ -99,6 +101,8 @@ class Parameter(BaseModel): class GCP(BaseModel): + """Class representing all GCP configuration settings. + """ artifact_repo_location: GCPLocations artifact_repo_name: str artifact_repo_type: str @@ -143,4 +147,4 @@ class Tooling(BaseModel): class Defaults(BaseModel): gcp: GCP pipeline_specs: PipelineSpecs - tooling: Tooling \ No newline at end of file + tooling: Tooling diff --git a/google_cloud_automlops/utils/utils.py b/google_cloud_automlops/utils/utils.py index ea4d93e..cdf3150 100644 --- a/google_cloud_automlops/utils/utils.py +++ b/google_cloud_automlops/utils/utils.py @@ -55,8 +55,13 @@ ) from google_cloud_automlops.utils.enums import ( + Defaults, + GCP, + GCPLocations, Orchestrator, - PipelineJobSubmitter + PipelineJobSubmitter, + PipelineSpecs, + Tooling, ) from google_cloud_automlops.utils.enums import ( @@ -1054,3 +1059,50 @@ def git_workflow(): if deployment_framework == Deployer.CLOUDBUILD.value: logging.info( f'''Cloud Build job running at: https://console.cloud.google.com/cloud-build/builds;region={defaults['gcp']['build_trigger_location']}''') + +def get_defaults(): + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + gcp = GCP( + artifact_repo_location = GCPLocations(defaults['gcp']['artifact_repo_location']), + artifact_repo_name = defaults['gcp']['artifact_repo_name'], + artifact_repo_type = defaults['gcp']['artifact_repo_type'], + base_image = defaults['gcp']['base_image'], + build_trigger_location = defaults['gcp']['build_trigger_location'], + build_trigger_name = defaults['gcp']['build_trigger_name'], + naming_prefix = defaults['gcp']['naming_prefix'], + pipeline_job_runner_service_account = defaults['gcp']['pipeline_job_runner_service_account'], + pipeline_job_submission_service_location = defaults['gcp']['pipeline_job_submission_service_location'], + pipeline_job_submission_service_name = defaults['gcp']['pipeline_job_submission_service_name'], + pipeline_job_submission_service_type = defaults['gcp']['pipeline_job_submission_service_type'], + project_id = defaults['gcp']['project_id'], + setup_model_monitoring = defaults['gcp']['setup_model_monitoring'], + pubsub_topic_name = defaults['gcp']['pubsub_topic_name'], + schedule_location = defaults['gcp']['schedule_location'], + schedule_name = defaults['gcp']['schedule_name'], + schedule_pattern = defaults['gcp']['schedule_pattern'], + source_repository_branch = defaults['gcp']['source_repository_branch'], + source_repository_name = defaults['gcp']['source_repository_name'], + source_repository_type = defaults['gcp']['source_repository_type'], + storage_bucket_location = defaults['gcp']['storage_bucket_location'], + storage_bucket_name = defaults['gcp']['storage_bucket_name'], + vpc_connector = defaults['gcp']['vpc_connector'], + ) + pipeline_specs = PipelineSpecs( + gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'], + parameter_values_path = defaults['pipelines']['parameter_values_path'], + pipeline_component_directory = defaults['pipelines']['pipeline_component_directory'], + pipeline_job_spec_path = defaults['pipelines']['pipeline_job_spec_path'], + pipeline_region = defaults['pipelines']['pipeline_region'], + pipeline_storage_path = defaults['pipelines']['pipeline_storage_path'], + ) + tooling = Tooling( + deployment_framework = defaults['tooling']['deployment_framework'], + provisioning_framework = defaults['tooling']['provisioning_framework'], + orchestration_framework = defaults['tooling']['orchestration_framework'], + use_ci = defaults['tooling']['use_ci'], + ) + return Defaults( + gcp = gcp, + pipeline_specs = pipeline_specs, + tooling = tooling + ) From c50fb21c72505c05bd437d180ab34e6fef581c5d Mon Sep 17 00:00:00 2001 From: Allegra Noto Date: Tue, 20 Aug 2024 12:39:06 -0400 Subject: [PATCH 4/5] Added type hints to output parameter lists --- google_cloud_automlops/orchestration/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google_cloud_automlops/orchestration/base.py b/google_cloud_automlops/orchestration/base.py index 0a1f8b1..0fe7f7d 100644 --- a/google_cloud_automlops/orchestration/base.py +++ b/google_cloud_automlops/orchestration/base.py @@ -115,7 +115,7 @@ def _get_function_return_types(self) -> list: raise TypeError(f'''Return type hint for function "{self.name}" must be a NamedTuple.''') # Creates a parameter object for each parameter returned by component - outputs = [] + outputs: List[Parameter] = [] for name, type_ in annotation.__annotations__.items(): p = Parameter( name=name, @@ -141,7 +141,7 @@ def _get_function_parameters(self) -> list: doc_dict = {p.arg_name: p.description for p in parsed_docstring.params} # Extract parameter metadata - parameter_holder = [] + parameter_holder: List[Parameter] = [] for param in parameters: p = Parameter( name=param.name, From 6a6d0cb64d3105fd3f78a353da6cbd1bb7f63c23 Mon Sep 17 00:00:00 2001 From: Allegra Noto Date: Wed, 28 Aug 2024 13:23:31 +0000 Subject: [PATCH 5/5] Fixed small errors --- google_cloud_automlops/orchestration/kfp.py | 4 ++-- google_cloud_automlops/utils/enums.py | 15 +++++++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/google_cloud_automlops/orchestration/kfp.py b/google_cloud_automlops/orchestration/kfp.py index c01c3df..c1bf6ac 100644 --- a/google_cloud_automlops/orchestration/kfp.py +++ b/google_cloud_automlops/orchestration/kfp.py @@ -308,7 +308,7 @@ def build(self, mode='w') # pipelines/runtime_parameters/pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob. - self.pipeline_params['gs_pipeline_spec_path'] = self.defaults.gcp.gs_pipeline_job_spec_path + self.pipeline_params['gs_pipeline_spec_path'] = self.defaults.pipeline_specs.gs_pipeline_job_spec_path serialized_params = json.dumps(self.pipeline_params, indent=4) write_file(BASE_DIR + GENERATED_PARAMETER_VALUES_PATH, serialized_params, 'w') @@ -480,7 +480,7 @@ def _build_submission_services(self): render_jinja( template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'main.py.j2', generated_license=GENERATED_LICENSE, - pipeline_root=self.defaults.gcp.pipeline_storage_path, + pipeline_root=self.defaults.pipeline_specs.pipeline_storage_path, pipeline_job_runner_service_account=self.defaults.gcp.pipeline_job_runner_service_account, pipeline_job_submission_service_type=self.defaults.gcp.pipeline_job_submission_service_type, project_id=self.defaults.gcp.project_id, diff --git a/google_cloud_automlops/utils/enums.py b/google_cloud_automlops/utils/enums.py index d61d65f..a83c9b3 100644 --- a/google_cloud_automlops/utils/enums.py +++ b/google_cloud_automlops/utils/enums.py @@ -20,7 +20,7 @@ # pylint: disable=line-too-long from enum import Enum -from pydantic import BaseModel +from pydantic import BaseModel, validator class Deployer(Enum): @@ -96,9 +96,20 @@ class GCPLocations(Enum): class Parameter(BaseModel): name: str - type: str + type: type description: str + @validator("type", pre=True) + def type_to_empty(cls, v: object) -> object: + if v is None: + return "" + return v + @validator("description", pre=True) + def description_to_empty(cls, v: object) -> object: + if v is None: + return "" + return v + class GCP(BaseModel): """Class representing all GCP configuration settings.