diff --git a/cobra/industrialization/__init__.py b/cobra/industrialization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cobra/industrialization/integrations/CobraScorer.py b/cobra/industrialization/integrations/CobraScorer.py new file mode 100644 index 0000000..f41d5e7 --- /dev/null +++ b/cobra/industrialization/integrations/CobraScorer.py @@ -0,0 +1,150 @@ +import json +import pickle +from enum import Enum +from enum import IntEnum +from typing import List, Union, Optional + +import numpy as np +from cobra.preprocessing import PreProcessor +from pandas import DataFrame +from smart_open import open + +from cobra.model_building import LogisticRegressionModel, LinearRegressionModel, \ + ForwardFeatureSelection + + +class CobraModelSerializationType(IntEnum): + JSON = 1 + PICKLE = 2 + + +class CobraScorer: + """ + Wrapper around a Cobra model that needs to be scored with data from Big Query + and scores to be stored on Big Query + """ + + def __init__(self, + pipeline_path: str, + model_path: str, + model_serialization_type: CobraModelSerializationType, + continuous_vars: List[str], + discrete_vars: List[str], + id_column_name='id', + score_column_name='score', + **kwargs): + """ + + :param pipeline_path: path to a json file that represents a cobra PreProcessor + :param model_path: path to a json or pickle file with a Cobra Model or Forward Selection + :param model_serialization_type: Type of serialization used for the file in model_path + :param continuous_vars: list of continuous variables to use when scoring the model + :param discrete_vars: list of discrete variables to use when scoring the model + :param id_column_name: name of the column used to identify rows in the observations and + scores dataframes + :param score_column_name: name of the column with scores in the output dataframe + :param kwargs: other generic arguments, such as 'step' for forward selection + """ + + # TODO replace by base class once implemented in Cobra + self.model: Optional[Union[LinearRegressionModel, LogisticRegressionModel]] = None + self.preprocessor: Optional[PreProcessor] = None + + self.id_column_name = id_column_name + self.score_column_name = score_column_name + self.model_serialization_type = model_serialization_type + self.continuous_vars = continuous_vars + self.discrete_vars = discrete_vars + + self.load_pipeline(pipeline_path) + self.load_model(model_path=model_path, **kwargs) + + @classmethod + def deserialize_model(cls, model_dict: dict)\ + -> Union[LinearRegressionModel, LogisticRegressionModel]: + """ + Method to deserialize a Cobra model based on a json file + Fails if the json file does not contain a key meta with a valid model type + TODO build as part as Cobra's model base class deserialize + TODO replace return type when base class is created + :param model_dict: dictionary representing the serialized model + :return: + """ + # TODO build as part as Cobra's model base class deserialize + # dictionary of (meta attribute in json file), (class for that description) + MODEL_META = { + "linear-regression": LinearRegressionModel, + "logistic-regression": LogisticRegressionModel + } + + model_cls = MODEL_META.get(model_dict["meta"]) + + model = model_cls() + model.deserialize(model_dict) + + return model + + def load_pipeline(self, pipeline_path: str): + """ + Method to load a pipeline into the preprocessor attribute + :param pipeline_path: Path to a json file pre processing pipeline serialized as + Supports locations supported by smart_open + :return: + """ + with open(pipeline_path) as pipeline_file: + processing_pipeline = json.load(pipeline_file) + self.preprocessor = PreProcessor.from_pipeline(processing_pipeline) + + def load_model(self, model_path: str, **kwargs): + """ + Load a Cobra model from a json file or from a pickle file + + If the stored file represents an instance of ForwardFeatureSelection, then 'step' must be + provided + :param model_path: + :param kwargs: + :return: nothing. Loaded model is stored in the model attribute. + """ + model = None + if self.model_serialization_type == CobraModelSerializationType.JSON: + with open(model_path, "r") as model_file: + model_dict = json.load(model_file) + model = self.deserialize_model(model_dict) + + elif self.model_serialization_type == CobraModelSerializationType.PICKLE: + with open(model_path, "rb") as model_file: + model = pickle.load(model_file) + else: + raise ValueError(f"Invalid CobraModelSerializationType: {self.model_serialization_type}") + + if isinstance(model, ForwardFeatureSelection): + step = kwargs.get('step') + model = model.get_model_from_step(step) + + self.model = model + + def score(self, observations: DataFrame) -> DataFrame: + """ + Method to score a set of observations which have not been processed by a Cobra PreProcessor + yet + :param observations: dataframe with observations + :return: dataframe with scores, with columns self.id_column_name and self.score_column_name + """ + pre_processed_obs = self.preprocessor.transform(observations, + continuous_vars=self.continuous_vars, + discrete_vars=self.discrete_vars) + + return self._do_score(pre_processed_obs) + + def _do_score(self, pre_processed_obs: DataFrame) -> DataFrame: + """ + Internal method to score a dataframe containing observations which have been processed + by a Cobra PreProcessor + precondition id_column_name must exist in the pre_processed_obs + :param pre_processed_obs: + :return: dataframe with scores, with columns self.id_column_name and self.score_column_name + """ + scores = pre_processed_obs[[self.id_column_name]].copy() + scores[self.score_column_name] = self.model.score_model(pre_processed_obs) + + return scores diff --git a/cobra/industrialization/integrations/__init__.py b/cobra/industrialization/integrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cobra/industrialization/integrations/airflow/ABTTablesConfig.py b/cobra/industrialization/integrations/airflow/ABTTablesConfig.py new file mode 100644 index 0000000..7be6c40 --- /dev/null +++ b/cobra/industrialization/integrations/airflow/ABTTablesConfig.py @@ -0,0 +1,94 @@ +import os +from typing import Dict, List + + +class ABTTablesConfig: + """ + Class to abstract the configuration and naming convention of tables and + datasets in ABT + """ + + def __init__(self): + self.dataset_slg_sds = os.getenv("ABT_DATASET_SLG_SDS", "slg_sds_dev") + self.dataset_slg_sds_hist = os.getenv("ABT_DATASET_SLG_SDS_HIST", "slg_sds_hist_dev") + self.input_dataset = os.getenv("ABT_DATASET_INPUT", "input_dev") + self.dataset_intermediate = os.getenv("ABT_DATASET_INTERMEDIATE", "intermediate_dev") + self.dataset_output = os.getenv("ABT_DATASET_OUTPUT", "output_dev") + self.source_project = os.getenv("ABT_SOURCE_PROJECT", "analytical-base-tables-staging") + self.target_project = os.getenv("ABT_TARGET_PROJECT", "analytical-base-tables-staging") + self.bucket_backups = os.getenv("ABT_BUCKET_BACKUPS", "abt_backups_dev") + self.location = os.getenv("ABT_LOCATION", "eu") + self.run_date_format = "%Y%m%d" + + def get_source_tables(self) -> List[Dict[str,str]]: + return [ + {"dataset": self.dataset_slg_sds, "table": "communication_stats"}, + {"dataset": self.dataset_slg_sds, "table": "contact_moments"}, + {"dataset": self.dataset_slg_sds, "table": "site_tag_product_hits"}, + {"dataset": self.dataset_slg_sds, "table": "users_maxeda"}, + {"dataset": self.dataset_slg_sds_hist, "table": "COMMUNICATIONDOMAIN"}, + {"dataset": self.dataset_slg_sds_hist, "table": "COMMUNICATIONS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "CONTACTS_TAX_EMAIL"}, + {"dataset": self.dataset_slg_sds_hist, "table": "CONTACTS_TAX_TRANS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "FAV_STORE"}, + {"dataset": self.dataset_slg_sds_hist, "table": "INTERACTIONS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "MAILCLIENTCODES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "MESSAGE"}, + {"dataset": self.dataset_slg_sds_hist, "table": "MESSAGEDELIVERYSTATES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "PROBECODES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "PRODUCTS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_COMMUNICATIONDOMAIN"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_COMMUNICATIONS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_DATA_CONSENT"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_FAV_STORE"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_INTERACTIONS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_LOYALTYCARDS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_MAILCLIENTCODES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_MESSAGE"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_MESSAGEDELIVERYSTATES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_MOBILE_BRICO"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_MOBILE_PRAXIS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_PROBECODES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_PRODUCTS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_RFM_BRICO"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_RFM_PRAXIS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_SHOPS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_SITETAGPRODUCT"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_SUBJECTRIGHTS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_TRANSACTIONLINES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_TRANSACTIONS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RAW_USERS_CONTACTS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RFM_BRICO"}, + {"dataset": self.dataset_slg_sds_hist, "table": "RFM_PRAXIS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "SHOPS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "SITETAGPRODUCT"}, + {"dataset": self.dataset_slg_sds_hist, "table": "TRANSACTIONS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "TRANSACTIONLINES"}, + {"dataset": self.dataset_slg_sds_hist, "table": "USERS_CONTACTS"}, + {"dataset": self.dataset_slg_sds_hist, "table": "purchase_history"}, + {"dataset": self.dataset_slg_sds_hist, "table": "purchase_history_grouped"} + + ] + + def get_target_table_name(self, source_dataset: str, source_table) -> str: + """ + Method to build a target table name based on a source dataset and source table to be used + in the input dataset + + :param source_dataset: name of the original source_dataset + :param source_table: name of the table in the original source_dataset + :return: string with source_dataset and source_table + """ + return f"{source_dataset}_{source_table}" + + def get_target_table_names(self) -> Dict[str, str]: + """ + Method to retrieve a dictionary with the mappings of all table names in the input dataset + + :return: A dictionary mapping a table_{table name} to the name of the table in the + input dataset + """ + return { + f"table_{st['table']}": self.get_target_table_name(st['dataset'], st['table']) + for st in self.get_source_tables() + } diff --git a/cobra/industrialization/integrations/airflow/__init__.py b/cobra/industrialization/integrations/airflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cobra/industrialization/integrations/airflow/dag_config.py b/cobra/industrialization/integrations/airflow/dag_config.py new file mode 100644 index 0000000..4642d0d --- /dev/null +++ b/cobra/industrialization/integrations/airflow/dag_config.py @@ -0,0 +1,84 @@ +import os + +from airflow.operators.python import ShortCircuitOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow import DAG +from airflow.operators.bash import BashOperator +from datetime import datetime, date, timedelta + +DAG_ID_ININITIAL_LOAD = "abt_initial_load" +DAG_ID_INGEST = "abt_ingest_sources" +DAG_ID_CREATE_ABT = "create_abt" +DAG_ID_BACKUP_CLEAN = "backup_and_clean_abt" + +AIRFLOW_HOME = os.environ["AIRFLOW_HOME"] +DAGS_FOLDER = os.getenv("DAGS_FOLDER", os.path.join(AIRFLOW_HOME, "dags")) +BQ_SCRIPTS_FOLDER = os.path.join(DAGS_FOLDER, "scripts", "bq") + + +def create_next_dags(next_dag_id: str, chain_dags: str, dag: DAG, logical_date_ts: str): + """ + Function to create the next dags, based on a chain dags condition. If the condition is set to + the string 'true', then the next dag as in @param next_dag_id will be triggered. Otherwise, it + will not + :param next_dag_id: ID of the next dag to trigger if chain_dag is 'true' + :param chain_dags: 'true' if next dag should be triggered + :param run_date: run date in %Y%m%d or %Y-%m-%d to pass to next dag + :param dag: instance of airflow DAG + :return: a ShortCircuitOperator operator followed by a TriggerDagRunOperator based on chain_dags + and next_dag_id + """ + task_check_if_chained_dags = ShortCircuitOperator( + task_id='check_if_chained_dags', + op_kwargs={"do_run": chain_dags}, + python_callable=lambda do_run: do_run.lower() == "true", + dag=dag + ) + + task_trigger_next_dag = TriggerDagRunOperator( + task_id=f"trigger_{next_dag_id}", + trigger_dag_id=next_dag_id, + dag=dag, + execution_date=logical_date_ts, + wait_for_completion=True, # ensure we don't run this DAG again until the following is done + conf={ + "chain_dags": chain_dags + } + ) + + task_check_if_chained_dags.set_downstream(task_trigger_next_dag) + + return task_check_if_chained_dags + + +def create_auth_task(dag: DAG): + """ + Function to create an airflow task based on a Bash operator. + Based on the env var "NEED_TO_GCLOUD_AUTH" being "true", the task will authenticate via + gcloud auth. Otherwise, it will just echo 'No need to authenticate, skipping login'. + :param dag: instance of airflow DAG + :return: BashOperator with the right bash command to run + """ + need_to_auth = os.getenv("NEED_TO_GCLOUD_AUTH", "False") + if need_to_auth.lower() == "true": + login_command = f"gcloud auth activate-service-account "\ + f"{os.getenv('GCP_SERVICE_ACCOUNT_NAME')} "\ + f"--key-file={os.getenv('GCP_SERVICE_ACCOUNT_KEY_PATH')}" + else: + login_command = "echo 'No need to authenticate, skipping login'" + + task_login = BashOperator( + task_id="gcloud_auth_login", + bash_command=login_command, + dag=dag + ) + + return task_login + + +def get_partition_date_str(date_str, date_format="%Y%m%d"): + run_date = datetime.strptime(date_str, date_format) + + # Partition date is the monday of the week, in %Y%m%d format + partition_date = (run_date - timedelta(days=run_date.weekday())).strftime(date_format) + return partition_date diff --git a/cobra/industrialization/integrations/airflow/requirements.txt b/cobra/industrialization/integrations/airflow/requirements.txt new file mode 100644 index 0000000..5ae09bd --- /dev/null +++ b/cobra/industrialization/integrations/airflow/requirements.txt @@ -0,0 +1,2 @@ +airflow +apache-airflow-providers-google==6.7.0 \ No newline at end of file diff --git a/cobra/industrialization/integrations/airflow/score_garden.py b/cobra/industrialization/integrations/airflow/score_garden.py new file mode 100644 index 0000000..8ab5141 --- /dev/null +++ b/cobra/industrialization/integrations/airflow/score_garden.py @@ -0,0 +1,96 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator + +from ABTTablesConfig import ABTTablesConfig +import dag_config as conf +from BQCobraScorer import BQCobraScorer +from CobraScorer import CobraModelSerializationType + +# DAG to pre process and score the garden prediction model +# With a few changes (see TODOs) the DAG could be generic for any model or a loop over +# configurations could dynamically generate more than 1 DAG + +dryRun = False + +abtConf = ABTTablesConfig() + +dict_table_names = abtConf.get_target_table_names() + +dag = DAG( + dag_id="score_garden_table", + start_date=datetime(year=2021, month=1, day=1), + # end_date=datetime(year=2022, month=3, day=20), + default_args={"email_on_failure": False}, + description="DAG to run the queries necessary to populate the ABT table", + schedule_interval=None, + catchup=False, + render_template_as_native_obj=False, + max_active_runs=1, + user_defined_macros={"input_dataset": abtConf.input_dataset, + "dataset_intermediate": abtConf.dataset_intermediate, + "dataset_output": abtConf.dataset_output, + "project": abtConf.target_project, + "bucket_backups": abtConf.bucket_backups, + "location": abtConf.location, + "run_date_format": f"\"{abtConf.run_date_format}\"", + "partition_date": conf.get_partition_date_str, + **dict_table_names # append table names + } +) + +# sorted list of queries to run +query_files = [ + "preprocessing.sql" +] + +with dag: + + tasks = [] + for idx, query in enumerate(query_files): + task_run_query = BigQueryInsertJobOperator( + task_id=f"run_query_{query}", + location=abtConf.location, + configuration={ + "query": { + "query": f"{{% include 'queries/garden_pre_processing/{query}' %}}", # path could be more generic + "useLegacySql": False, + "dryRun": dryRun + }, + }, + dag=dag + ) + + tasks.append(task_run_query) + if idx > 0: + task_run_query.set_upstream(tasks[idx-1]) + + # Finally, score the model + model_name = "garden_prediction" + + task_score_model = PythonOperator( + task_id=f"score_model_{model_name}", + python_callable=BQCobraScorer.score_and_save, + op_kwargs={ + 'score_date': '{{ds_nodash}}', + 'input_table': "analytical-base-tables-staging.predictive_test.garden_score_with_postcodes_small", # remove hardcoded + 'output_table': "analytical-base-tables-staging.output_devnm.garden_score", # remove hardcoded + 'pipeline_path': 'gs://garden-model-stg/20220407/preprocessing_pipeline_garden_project.json', # read from env? + 'model_path': 'gs://garden-model-stg/20220407/forward_selection_garden_project.pickle', # read from env? + 'model_serialization_type': CobraModelSerializationType.PICKLE, + 'continuous_vars': ['lifetime_sales_amount', 'sum_mon_dep_tuingereedschap', + 'sum_email_click_count_7d', 'ratio_mail_click_delivered_Q2', + 'ratio_mail_click_delivered_Q3', 'mon_garden_March'], + 'discrete_vars': ['NEWHABI'], + 'id_column_name': 'user_id', + 'score_column_name': 'score', + 'date_partition_column_name': 'score_date', + 'location': 'eu', + 'step': 5 + }, + dag=dag + ) + + tasks[-1].set_downstream(task_score_model) diff --git a/cobra/industrialization/integrations/bigquery/BQCobraScorer.py b/cobra/industrialization/integrations/bigquery/BQCobraScorer.py new file mode 100644 index 0000000..a16dbac --- /dev/null +++ b/cobra/industrialization/integrations/bigquery/BQCobraScorer.py @@ -0,0 +1,254 @@ +import pandas_gbq +import pandas as pd +import json +from google.cloud import bigquery +from google.oauth2 import service_account +from typing import List, Optional + +from datetime import datetime + +from model.common.CobraScorer import CobraScorer, CobraModelSerializationType + + +class BQCobraScorer: + + def __init__(self, input_table: str, + output_table: str, + pipeline_path: str, + model_path: str, + model_serialization_type: CobraModelSerializationType, + continuous_vars: List[str], + discrete_vars: List[str], + id_column_name: str = 'id', + id_column_type: str = 'INTEGER', + score_column_name: str = 'score', + score_column_type: str = 'FLOAT', + date_partition_column_name: str = 'score_date', + key_path: Optional[str] = None, + location: str = 'eu', + audit_table: Optional[str] = None, + **kwargs): + """ + + :param input_table: + :param output_table: + :param pipeline_path: + :param model_path: + :param model_serialization_type: + :param continuous_vars: + :param discrete_vars: + :param id_column_name: + :param id_column_type: + :param score_column_name: + :param score_column_type: + :param date_partition_column_name: + :param key_path: + :param location: + :param audit_table: + :param kwargs: + """ + + self.cobra_scorer = CobraScorer(pipeline_path=pipeline_path, + model_path=model_path, + model_serialization_type=model_serialization_type, + continuous_vars=continuous_vars, + discrete_vars=discrete_vars, + id_column_name=id_column_name, + score_column_name=score_column_name, + **kwargs) + self.model_path = model_path + self.model_serialization_type = model_serialization_type + self.continuous_vars = continuous_vars, + self.discrete_vars = discrete_vars, + self.pipeline_path = pipeline_path + self.input_table = input_table + self.output_table = output_table + self.key_path = key_path + self.location = location + self.credentials = None + self.id_column_name = id_column_name + self.id_column_type = id_column_type + self.score_column_name = score_column_name + self.score_column_type = score_column_type + self.date_partition_column_name = date_partition_column_name + self.audit_table = audit_table if audit_table else f"{self.output_table}_audit" + self.kwargs = kwargs + + if key_path: + self.credentials = service_account.Credentials.from_service_account_file( + key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + self.client = bigquery.Client(credentials=self.credentials, + project=self.credentials.project_id, + location=location) + else: + self.client = bigquery.Client(location=location) + + def write_scores_to_bq(self, scores: pd.DataFrame, score_date_str: str, + date_format: str = "%Y%m%d", overwrite_partition: bool = True): + """ + Method that writes scores to a big query table and writes audit information into the audit + table + + self.output table must exist and contain a column self.date_partition_column_name + :param overwrite_partition: + :param scores: + :param score_date_str: + :param date_format: + :return: + """ + score_date = datetime.strptime(score_date_str, date_format) + scores[self.date_partition_column_name] = score_date + table_schema = [{'name': self.id_column_name, 'type': self.id_column_type}, + {'name': self.score_column_name, 'type': self.score_column_type}, + {'name': self.date_partition_column_name, 'type': 'DATE'}, + ] + + # Workaround due to pandas_gbq not supporting write disposition + # https://github.com/googleapis/python-bigquery-pandas/issues/118 + if overwrite_partition: + self._delete_partition_from_output_table(score_date) + + pandas_gbq.to_gbq(dataframe=scores, + destination_table=self.output_table, + if_exists='append', + credentials=self.credentials, + table_schema=table_schema + ) + + self._log_score_run(score_date, overwrite_partition=overwrite_partition) + + def _delete_partition_from_output_table(self, score_date: datetime): + self.client.delete_table(table=f'{self.output_table}${score_date.strftime("%Y%m%d")}') + + def _log_score_run(self, run_date: datetime, **kwargs): + """ + Method that logs a run into a big query table, including + - score_date: date for which the scores are valid (aka logical_date) + - model_version: model version or path to the model used for scoring + - pipeline version + - exec_date: actual date in which the score was executed + - extra information that might be relevant in the future + :param run_date: date for which the model was scored + :param kwargs: extra arguments to be logged. Must be json serializable + :return: - + """ + # Log all attributes except for client, cobra_scorer and credentials + extra_args = {'model_serialization_type': self.model_serialization_type, + 'continuous_vars': self.continuous_vars, + 'discrete_vars': self.discrete_vars, + 'input_table': self.input_table, + 'output_table': self.output_table, + 'key_path': self.key_path, + 'location': self.location, + 'id_column_name': self.id_column_name, + 'id_column_type': self.id_column_type, + 'score_column_name': self.score_column_name, + 'score_column_type': self.score_column_type, + 'date_partition_column_name': self.date_partition_column_name, + 'audit_table': self.audit_table} + + if 'step' in self.kwargs: + extra_args['step'] = self.kwargs['step'] + extra = {**extra_args, **kwargs} + + audit_df = pd.DataFrame.from_dict( + { + "exec_date": [datetime.now()], + "run_date": [run_date], + "model_path": [self.model_path], + "pipeline_path": [self.pipeline_path], + "extra": [json.dumps(extra)] + } + ) + + table_schema = [{'name': 'exec_date', 'type': 'DATETIME'}, + {'name': 'run_date', 'type': 'DATE'}, + {'name': 'model_path', 'type': 'STRING'}, + {'name': 'pipeline_path', 'type': 'STRING'}, + {'name': 'extra', 'type': 'STRING'}] # change to JSON when out of preview + + pandas_gbq.to_gbq(dataframe=audit_df, + destination_table=self.audit_table, + if_exists='append', + credentials=self.credentials, + table_schema=table_schema + ) + + def load_observations(self) -> pd.DataFrame: + """ + Loads observations by selecting all columns and rows from the input table + :return: pandas dataframe with observations + """ + query = f"SELECT * FROM `{self.input_table}`" + + return pandas_gbq.read_gbq(query, credentials=self.credentials) + + def score(self) -> pd.DataFrame: + obs = self.load_observations() + return self.cobra_scorer.score(observations=obs) + + @classmethod + def score_and_save(cls, + score_date: str, + input_table: str, + output_table: str, + pipeline_path: str, + model_path: str, + model_serialization_type: CobraModelSerializationType, + continuous_vars: List[str], + discrete_vars: List[str], + id_column_name: str = 'id', + id_column_type: str = 'INTEGER', + score_column_name: str = 'score', + score_column_type: str = 'FLOAT', + date_partition_column_name: str = 'score_date', + key_path: Optional[str] = None, + location: str = 'eu', + audit_table: Optional[str] = None, + **kwargs + ): + """ + Class method to load, score a model, and save the results to BQ in one go without needing + to instantiate an instance. Useful for PythonOperator + :param score_date: string representing the date for which the scores are computed + :param input_table: + :param output_table: + :param pipeline_path: + :param model_path: + :param model_serialization_type: + :param continuous_vars: + :param discrete_vars: + :param id_column_name: + :param id_column_type: + :param score_column_name: + :param score_column_type: + :param date_partition_column_name: + :param key_path: + :param location: + :param audit_table: + :param kwargs: + :return: + """ + + cobra_scorer = BQCobraScorer( + input_table=input_table, + output_table=output_table, + pipeline_path=pipeline_path, + model_path=model_path, + model_serialization_type=model_serialization_type, + continuous_vars=continuous_vars, + discrete_vars=discrete_vars, + id_column_name=id_column_name, + id_column_type=id_column_type, + score_column_name=score_column_name, + score_column_type=score_column_type, + date_partition_column_name=date_partition_column_name, + key_path=key_path, + location=location, + audit_table=audit_table, + **kwargs + ) + + df_scores = cobra_scorer.score() + cobra_scorer.write_scores_to_bq(df_scores, score_date) \ No newline at end of file diff --git a/cobra/industrialization/integrations/bigquery/__init__.py b/cobra/industrialization/integrations/bigquery/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cobra/industrialization/integrations/bigquery/cobra_scorer.py b/cobra/industrialization/integrations/bigquery/cobra_scorer.py new file mode 100644 index 0000000..c79c7c2 --- /dev/null +++ b/cobra/industrialization/integrations/bigquery/cobra_scorer.py @@ -0,0 +1,84 @@ +import pandas as pd +from src.model.common.CobraScorer import CobraScorer, CobraModelSerializationType +from src.model.common.BQCobraScorer import BQCobraScorer +from google.cloud import bigquery +from google.oauth2 import service_account + +if __name__ == '__main__': + postcodes_path = '/Users/nicolas.morandi/workspace/brico-analytical-base-tables/local/data/Verrijkte_postcodes.xlsx' + # model_path = '/Users/nicolas.morandi/workspace/brico-analytical-base-tables/local/data/forward_selection_garden_project.pickle' + # pipeline_path = '/Users/nicolas.morandi/workspace/brico-analytical-base-tables/local/data/preprocessing_pipeline_garden_project.json' + + pipeline_path = 'gs://garden-model-stg/20220407/preprocessing_pipeline_garden_project.json' + model_path = 'gs://garden-model-stg/20220407/forward_selection_garden_project.pickle' + key_path = '/Users/nicolas.morandi/workspace/brico-analytical-base-tables/local/keys/analytical-base-tables-staging-sa_api.json' + run_date = '20220203' + # + # credentials = service_account.Credentials.from_service_account_file( + # key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"], + # ) + # + # client = bigquery.Client(credentials=credentials, project=credentials.project_id, location='eu') + + + # change select * to columns + # query = """ + # SELECT * + # FROM `analytical-base-tables-staging.predictive_test.garden_score_with_postcodes` + # """ + # query_job = client.query(query) + # + # basetable_model = query_job.result().to_dataframe() + + cont_vars = ['lifetime_sales_amount', 'sum_mon_dep_tuingereedschap', + 'sum_email_click_count_7d', 'ratio_mail_click_delivered_Q2', + 'ratio_mail_click_delivered_Q3', 'mon_garden_March'] + + cat_vars = ['NEWHABI'] + # scorer = CobraScorer(pipeline_path=pipeline_path, + # model_path=model_path, + # model_serialization_type=CobraModelSerializationType.PICKLE, + # continuous_vars=cont_vars, + # discrete_vars=cat_vars, + # id_column_name='user_id', + # score_column_name='score', + # step=5 + # ) + # + # scores = scorer.score(basetable_model) + # + # print(scores) + + # cobra_scorer = BQCobraScorer(input_table="analytical-base-tables-staging.predictive_test.garden_score_with_postcodes_small", + # output_table="analytical-base-tables-staging.output_devnm.garden_score", + # pipeline_path=pipeline_path, + # model_path=model_path, + # model_serialization_type=CobraModelSerializationType.PICKLE, + # continuous_vars=cont_vars, + # discrete_vars=cat_vars, + # id_column_name='user_id', + # score_column_name='score', + # date_partition_column_name='score_date', + # key_path=key_path, + # location='eu', + # step=5 + # ) + # df_scores = cobra_scorer.score() + # print(df_scores) + # cobra_scorer.write_scores_to_bq(df_scores, run_date) + + BQCobraScorer.score_and_save(score_date=run_date, + input_table="analytical-base-tables-staging.predictive_test.garden_score_with_postcodes_small", + output_table="analytical-base-tables-staging.output_devnm.garden_score", + pipeline_path=pipeline_path, + model_path=model_path, + model_serialization_type=CobraModelSerializationType.PICKLE, + continuous_vars=cont_vars, + discrete_vars=cat_vars, + id_column_name='user_id', + score_column_name='score', + date_partition_column_name='score_date', + key_path=key_path, + location='eu', + step=5 + ) diff --git a/cobra/industrialization/integrations/bigquery/requirements.txt b/cobra/industrialization/integrations/bigquery/requirements.txt new file mode 100644 index 0000000..cfa8268 --- /dev/null +++ b/cobra/industrialization/integrations/bigquery/requirements.txt @@ -0,0 +1 @@ +pandas-gbq \ No newline at end of file diff --git a/cobra/industrialization/integrations/docker/docker-compose.yml b/cobra/industrialization/integrations/docker/docker-compose.yml new file mode 100644 index 0000000..3bf208c --- /dev/null +++ b/cobra/industrialization/integrations/docker/docker-compose.yml @@ -0,0 +1,7 @@ +version: '3' +services: + cobra: + image: python:3.9-slim + volumes: ../../../../.:/cobra + working_dir: /cobra + command: pip install -r /cobra/requirements.txt diff --git a/cobra/industrialization/integrations/postgresql/create_scores_table.sql b/cobra/industrialization/integrations/postgresql/create_scores_table.sql new file mode 100644 index 0000000..1af4b58 --- /dev/null +++ b/cobra/industrialization/integrations/postgresql/create_scores_table.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS {{project}}.{{dataset_output}}.garden_score +( + user_id INT64, + score FLOAT64, + score_date DATE +) +PARTITION BY + score_date +OPTIONS( + require_partition_filter=true +) \ No newline at end of file diff --git a/cobra/industrialization/integrations/postgresql/requirements.txt b/cobra/industrialization/integrations/postgresql/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/cobra/industrialization/integrations/requirements.txt b/cobra/industrialization/integrations/requirements.txt new file mode 100644 index 0000000..697d857 --- /dev/null +++ b/cobra/industrialization/integrations/requirements.txt @@ -0,0 +1 @@ +smart_open[gcs]==5.2.1 \ No newline at end of file