diff --git a/google_cloud_automlops/orchestration/base.py b/google_cloud_automlops/orchestration/base.py index f7aef40..0fe7f7d 100644 --- a/google_cloud_automlops/orchestration/base.py +++ b/google_cloud_automlops/orchestration/base.py @@ -26,14 +26,14 @@ import docstring_parser from google_cloud_automlops.utils.utils import ( + get_defaults, get_function_source_definition, - read_yaml_file ) from google_cloud_automlops.utils.constants import ( BASE_DIR, DEFAULT_PIPELINE_NAME, - GENERATED_DEFAULTS_FILE ) +from google_cloud_automlops.utils.enums import Parameter T = TypeVar('T') @@ -78,10 +78,7 @@ def __init__(self, self.src_code = get_function_source_definition(self.func) # Instantiate attributes to be set during build - self.artifact_repo_location = None - self.artifact_repo_name = None - self.project_id = None - self.naming_prefix = None + self.defaults = None def build(self): """Instantiates an abstract built method to create and write task files. Also reads in @@ -90,14 +87,7 @@ def build(self): Raises: NotImplementedError: The subclass has not defined the `build` method. """ - - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.artifact_repo_location = defaults['gcp']['artifact_repo_location'] - self.artifact_repo_name = defaults['gcp']['artifact_repo_name'] - self.project_id = defaults['gcp']['project_id'] - self.naming_prefix = defaults['gcp']['naming_prefix'] - - raise NotImplementedError('Subclass needs to define this.') + self.defaults = get_defaults() def _get_function_return_types(self) -> list: """Returns a formatted list of function return types. @@ -124,14 +114,15 @@ def _get_function_return_types(self) -> list: if not (hasattr(annotation,'__annotations__') and isinstance(annotation.__annotations__, dict)): raise TypeError(f'''Return type hint for function "{self.name}" must be a NamedTuple.''') - # Creates a dictionary of metadata for each object returned by component - outputs = [] + # Creates a parameter object for each parameter returned by component + outputs: List[Parameter] = [] for name, type_ in annotation.__annotations__.items(): - metadata = {} - metadata['name'] = name - metadata['type'] = type_ - metadata['description'] = None - outputs.append(metadata) + p = Parameter( + name=name, + type=type_, + description=None + ) + outputs.append(p) return outputs def _get_function_parameters(self) -> list: @@ -150,18 +141,19 @@ def _get_function_parameters(self) -> list: doc_dict = {p.arg_name: p.description for p in parsed_docstring.params} # Extract parameter metadata - parameter_holder = [] + parameter_holder: List[Parameter] = [] for param in parameters: - metadata = {} - metadata['name'] = param.name - metadata['description'] = doc_dict.get(param.name) - metadata['type'] = self.maybe_strip_optional_from_annotation( - param.annotation) - parameter_holder.append(metadata) + p = Parameter( + name=param.name, + type=self.maybe_strip_optional_from_annotation( + param.annotation), + description=doc_dict.get(param.name) + ) + parameter_holder.append(p) # pylint: disable=protected-access - if metadata['type'] == inspect._empty: + if p.type == inspect._empty: raise TypeError( - f'''Missing type hint for parameter "{metadata['name']}". ''' + f'''Missing type hint for parameter "{p.name}". ''' f'''Please specify the type for this parameter.''') return parameter_holder @@ -216,14 +208,9 @@ def __init__(self, self.comps = self.get_pipeline_components(func, comps_dict) # Instantiate attributes to be set at build process - self.base_image = None + self.defaults = None self.custom_training_job_specs = None self.pipeline_params = None - self.pubsub_topic_name = None - self.use_ci = None - self.project_id = None - self.gs_pipeline_job_spec_path = None - self.setup_model_monitoring = None def build(self, pipeline_params: dict, @@ -249,15 +236,7 @@ def build(self, self.pipeline_params = pipeline_params # Extract additional attributes from defaults file - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.project_id = defaults['gcp']['project_id'] - self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'] - self.base_image = defaults['gcp']['base_image'] - self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] - self.use_ci = defaults['tooling']['use_ci'] - self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] - - raise NotImplementedError('Subclass needs to define this.') + self.defaults = get_defaults() def get_pipeline_components(self, pipeline_func: Callable, @@ -301,12 +280,7 @@ class BaseServices(): def __init__(self) -> None: """Instantiates a generic Services object. """ - self.pipeline_storage_path = None - self.pipeline_job_runner_service_account = None - self.pipeline_job_submission_service_type = None - self.project_id = None - self.pipeline_job_submission_service_type = None - self.setup_model_monitoring = None + self.defaults = None # Set directory for files to be written to self.submission_service_base_dir = BASE_DIR + 'services/submission_service' @@ -325,12 +299,7 @@ def build(self): requirements.txt """ # Extract additional attributes from defaults file - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.pipeline_storage_path = defaults['pipelines']['pipeline_storage_path'] - self.pipeline_job_runner_service_account = defaults['gcp']['pipeline_job_runner_service_account'] - self.pipeline_job_submission_service_type = defaults['gcp']['pipeline_job_submission_service_type'] - self.project_id = defaults['gcp']['project_id'] - self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] + self.defaults = get_defaults() # Set directory for files to be written to self.submission_service_base_dir = BASE_DIR + 'services/submission_service' @@ -339,7 +308,7 @@ def build(self): self._build_submission_services() # Setup model monitoring - if self.setup_model_monitoring: + if self.defaults.gcp.setup_model_monitoring: self._build_monitoring() def _build_monitoring(self): diff --git a/google_cloud_automlops/orchestration/kfp.py b/google_cloud_automlops/orchestration/kfp.py index 59e76ae..c1bf6ac 100644 --- a/google_cloud_automlops/orchestration/kfp.py +++ b/google_cloud_automlops/orchestration/kfp.py @@ -74,11 +74,7 @@ class KFPComponent(BaseComponent): def build(self): """Constructs files for running and managing Kubeflow pipelines. """ - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.artifact_repo_location = defaults['gcp']['artifact_repo_location'] - self.artifact_repo_name = defaults['gcp']['artifact_repo_name'] - self.project_id = defaults['gcp']['project_id'] - self.naming_prefix = defaults['gcp']['naming_prefix'] + super().build() # Set and create directory for components if it does not already exist component_dir = BASE_DIR + 'components/' + self.name @@ -91,10 +87,10 @@ def build(self): BASE_DIR + 'components/component_base/src/']) compspec_image = ( - f'''{self.artifact_repo_location}-docker.pkg.dev/''' - f'''{self.project_id}/''' - f'''{self.artifact_repo_name}/''' - f'''{self.naming_prefix}/''' + f'''{self.defaults.gcp.artifact_repo_location.value}-docker.pkg.dev/''' + f'''{self.defaults.gcp.project_id}/''' + f'''{self.defaults.gcp.artifact_repo_name}/''' + f'''{self.defaults.gcp.naming_prefix}/''' f'''components/component_base:latest''') # Write component spec @@ -192,19 +188,12 @@ def build(self, to None. """ + super().build(pipeline_params, custom_training_job_specs) + # Save parameters as attributes self.custom_training_job_specs = custom_training_job_specs self.pipeline_params = pipeline_params - # Extract additional attributes from defaults file - defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) - self.project_id = defaults['gcp']['project_id'] - self.gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'] - self.base_image = defaults['gcp']['base_image'] - self.use_ci = defaults['tooling']['use_ci'] - self.pubsub_topic_name = defaults['gcp']['pubsub_topic_name'] if self.use_ci else None - self.setup_model_monitoring = defaults['gcp']['setup_model_monitoring'] - # Build necessary folders make_dirs([ f'{BASE_DIR}scripts/pipeline_spec/', @@ -217,8 +206,8 @@ def build(self, filepath=f'{BASE_DIR}README.md', text=render_jinja( template_path=import_files(KFP_TEMPLATES_PATH) / 'README.md.j2', - setup_model_monitoring=self.setup_model_monitoring, - use_ci=self.use_ci), + setup_model_monitoring=self.defaults.gcp.setup_model_monitoring, + use_ci=self.defaults.tooling.use_ci), mode='w') # components/component_base/dockerfile: Write the component base Dockerfile @@ -226,7 +215,7 @@ def build(self, filepath=f'{GENERATED_COMPONENT_BASE}/Dockerfile', text=render_jinja( template_path=import_files(KFP_TEMPLATES_PATH + '.components.component_base') / 'Dockerfile.j2', - base_image=self.base_image, + base_image=self.defaults.gcp.base_image, generated_license=GENERATED_LICENSE), mode='w') @@ -278,7 +267,7 @@ def build(self, base_dir=BASE_DIR)) # scripts/publish_to_topic.sh: If using CI, write script for publishing to pubsub topic - if self.use_ci: + if self.defaults.tooling.use_ci: write_and_chmod( filepath=GENERATED_PUBLISH_TO_TOPIC_FILE, text=render_jinja( @@ -286,7 +275,7 @@ def build(self, base_dir=BASE_DIR, generated_license=GENERATED_LICENSE, generated_parameter_values_path=GENERATED_PARAMETER_VALUES_PATH, - pubsub_topic_name=self.pubsub_topic_name)) + pubsub_topic_name=self.defaults.gcp.pubsub_topic_name)) # pipelines/pipeline.py: Generates a Kubeflow pipeline spec from custom components. components_list = self._get_component_list() @@ -299,7 +288,7 @@ def build(self, custom_training_job_specs=self.custom_training_job_specs, generated_license=GENERATED_LICENSE, pipeline_scaffold_contents=pipeline_scaffold_contents, - project_id=self.project_id), + project_id=self.defaults.gcp.project_id), mode='w') # pipelines/pipeline_runner.py: Sends a PipelineJob to Vertex AI using pipeline spec. @@ -319,7 +308,7 @@ def build(self, mode='w') # pipelines/runtime_parameters/pipeline_parameter_values.json: Provides runtime parameters for the PipelineJob. - self.pipeline_params['gs_pipeline_spec_path'] = self.gs_pipeline_job_spec_path + self.pipeline_params['gs_pipeline_spec_path'] = self.defaults.pipeline_specs.gs_pipeline_job_spec_path serialized_params = json.dumps(self.pipeline_params, indent=4) write_file(BASE_DIR + GENERATED_PARAMETER_VALUES_PATH, serialized_params, 'w') @@ -483,7 +472,7 @@ def _build_submission_services(self): render_jinja( template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'requirements.txt.j2', pinned_kfp_version=PINNED_KFP_VERSION, - pipeline_job_submission_service_type=self.pipeline_job_submission_service_type), + pipeline_job_submission_service_type=self.defaults.gcp.pipeline_job_submission_service_type), 'w') write_file( @@ -491,11 +480,11 @@ def _build_submission_services(self): render_jinja( template_path=import_files(KFP_TEMPLATES_PATH + '.services.submission_service') / 'main.py.j2', generated_license=GENERATED_LICENSE, - pipeline_root=self.pipeline_storage_path, - pipeline_job_runner_service_account=self.pipeline_job_runner_service_account, - pipeline_job_submission_service_type=self.pipeline_job_submission_service_type, - project_id=self.project_id, - setup_model_monitoring=self.setup_model_monitoring), + pipeline_root=self.defaults.pipeline_specs.pipeline_storage_path, + pipeline_job_runner_service_account=self.defaults.gcp.pipeline_job_runner_service_account, + pipeline_job_submission_service_type=self.defaults.gcp.pipeline_job_submission_service_type, + project_id=self.defaults.gcp.project_id, + setup_model_monitoring=self.defaults.gcp.setup_model_monitoring), 'w') write_file( diff --git a/google_cloud_automlops/utils/enums.py b/google_cloud_automlops/utils/enums.py index 033988a..a83c9b3 100644 --- a/google_cloud_automlops/utils/enums.py +++ b/google_cloud_automlops/utils/enums.py @@ -20,6 +20,7 @@ # pylint: disable=line-too-long from enum import Enum +from pydantic import BaseModel, validator class Deployer(Enum): @@ -58,10 +59,10 @@ class ArtifactRepository(Enum): class CodeRepository(Enum): """Enum representing the available options for source code repositories.""" - BITBUCKET = 'bitbucket' # roadmap item + # BITBUCKET = 'bitbucket' # roadmap item CLOUD_SOURCE_REPOSITORIES = 'cloud-source-repositories' - GITHUB = 'github' - GITLAB = 'gitlab' # roadmap item + # GITHUB = 'github' + # GITLAB = 'gitlab' # roadmap item class PipelineJobSubmitter(Enum): @@ -77,3 +78,84 @@ class PulumiRuntime(Enum): PYTHON = 'python' TYPESCRIPT = 'typescript' GO = 'go' + + +class GCPLocations(Enum): + """Enum representing the available GCP locations. + """ + US_CENTRAL_1 = 'us-central1' + US_EAST_1 = 'us-east1' + US_EAST_4 = 'us-east4' + US_EAST_5 = 'us-east5' + US_WEST_1 = 'us-west1' + US_WEST_2 = 'us-west2' + US_WEST_3 = 'us-west3' + US_WEST_4 = 'us-west4' + US_SOUTH_1 = 'us-south1' + + +class Parameter(BaseModel): + name: str + type: type + description: str + + @validator("type", pre=True) + def type_to_empty(cls, v: object) -> object: + if v is None: + return "" + return v + @validator("description", pre=True) + def description_to_empty(cls, v: object) -> object: + if v is None: + return "" + return v + + +class GCP(BaseModel): + """Class representing all GCP configuration settings. + """ + artifact_repo_location: GCPLocations + artifact_repo_name: str + artifact_repo_type: str + base_image: str + build_trigger_location: str + build_trigger_name: str + naming_prefix: str + pipeline_job_runner_service_account: str + pipeline_job_submission_service_location: str + pipeline_job_submission_service_name: str + pipeline_job_submission_service_type: str + project_id: str + setup_model_monitoring: bool + pubsub_topic_name: str + schedule_location: str + schedule_name: str + schedule_pattern: str + source_repository_branch: str + source_repository_name: str + source_repository_type: str + storage_bucket_location: str + storage_bucket_name: str + vpc_connector: str + + +class PipelineSpecs(BaseModel): + gs_pipeline_job_spec_path: str + parameter_values_path: str + pipeline_component_directory: str + pipeline_job_spec_path: str + pipeline_region: str + pipeline_storage_path: str + + +class Tooling(BaseModel): + deployment_framework: Deployer + provisioning_framework: Provisioner + orchestration_framework: Orchestrator + use_ci: bool + + +class Defaults(BaseModel): + gcp: GCP + pipeline_specs: PipelineSpecs + tooling: Tooling diff --git a/google_cloud_automlops/utils/utils.py b/google_cloud_automlops/utils/utils.py index ea4d93e..cdf3150 100644 --- a/google_cloud_automlops/utils/utils.py +++ b/google_cloud_automlops/utils/utils.py @@ -55,8 +55,13 @@ ) from google_cloud_automlops.utils.enums import ( + Defaults, + GCP, + GCPLocations, Orchestrator, - PipelineJobSubmitter + PipelineJobSubmitter, + PipelineSpecs, + Tooling, ) from google_cloud_automlops.utils.enums import ( @@ -1054,3 +1059,50 @@ def git_workflow(): if deployment_framework == Deployer.CLOUDBUILD.value: logging.info( f'''Cloud Build job running at: https://console.cloud.google.com/cloud-build/builds;region={defaults['gcp']['build_trigger_location']}''') + +def get_defaults(): + defaults = read_yaml_file(GENERATED_DEFAULTS_FILE) + gcp = GCP( + artifact_repo_location = GCPLocations(defaults['gcp']['artifact_repo_location']), + artifact_repo_name = defaults['gcp']['artifact_repo_name'], + artifact_repo_type = defaults['gcp']['artifact_repo_type'], + base_image = defaults['gcp']['base_image'], + build_trigger_location = defaults['gcp']['build_trigger_location'], + build_trigger_name = defaults['gcp']['build_trigger_name'], + naming_prefix = defaults['gcp']['naming_prefix'], + pipeline_job_runner_service_account = defaults['gcp']['pipeline_job_runner_service_account'], + pipeline_job_submission_service_location = defaults['gcp']['pipeline_job_submission_service_location'], + pipeline_job_submission_service_name = defaults['gcp']['pipeline_job_submission_service_name'], + pipeline_job_submission_service_type = defaults['gcp']['pipeline_job_submission_service_type'], + project_id = defaults['gcp']['project_id'], + setup_model_monitoring = defaults['gcp']['setup_model_monitoring'], + pubsub_topic_name = defaults['gcp']['pubsub_topic_name'], + schedule_location = defaults['gcp']['schedule_location'], + schedule_name = defaults['gcp']['schedule_name'], + schedule_pattern = defaults['gcp']['schedule_pattern'], + source_repository_branch = defaults['gcp']['source_repository_branch'], + source_repository_name = defaults['gcp']['source_repository_name'], + source_repository_type = defaults['gcp']['source_repository_type'], + storage_bucket_location = defaults['gcp']['storage_bucket_location'], + storage_bucket_name = defaults['gcp']['storage_bucket_name'], + vpc_connector = defaults['gcp']['vpc_connector'], + ) + pipeline_specs = PipelineSpecs( + gs_pipeline_job_spec_path = defaults['pipelines']['gs_pipeline_job_spec_path'], + parameter_values_path = defaults['pipelines']['parameter_values_path'], + pipeline_component_directory = defaults['pipelines']['pipeline_component_directory'], + pipeline_job_spec_path = defaults['pipelines']['pipeline_job_spec_path'], + pipeline_region = defaults['pipelines']['pipeline_region'], + pipeline_storage_path = defaults['pipelines']['pipeline_storage_path'], + ) + tooling = Tooling( + deployment_framework = defaults['tooling']['deployment_framework'], + provisioning_framework = defaults['tooling']['provisioning_framework'], + orchestration_framework = defaults['tooling']['orchestration_framework'], + use_ci = defaults['tooling']['use_ci'], + ) + return Defaults( + gcp = gcp, + pipeline_specs = pipeline_specs, + tooling = tooling + )