Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
150 changes: 150 additions & 0 deletions cobra/industrialization/integrations/CobraScorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import json
import pickle
from enum import Enum
from enum import IntEnum
from typing import List, Union, Optional

import numpy as np
from cobra.preprocessing import PreProcessor
from pandas import DataFrame
from smart_open import open

from cobra.model_building import LogisticRegressionModel, LinearRegressionModel, \
ForwardFeatureSelection


class CobraModelSerializationType(IntEnum):
JSON = 1
PICKLE = 2


class CobraScorer:
"""
Wrapper around a Cobra model that needs to be scored with data from Big Query
and scores to be stored on Big Query
"""

def __init__(self,
pipeline_path: str,
model_path: str,
model_serialization_type: CobraModelSerializationType,
continuous_vars: List[str],
discrete_vars: List[str],
id_column_name='id',
score_column_name='score',
**kwargs):
"""

:param pipeline_path: path to a json file that represents a cobra PreProcessor
:param model_path: path to a json or pickle file with a Cobra Model or Forward Selection
:param model_serialization_type: Type of serialization used for the file in model_path
:param continuous_vars: list of continuous variables to use when scoring the model
:param discrete_vars: list of discrete variables to use when scoring the model
:param id_column_name: name of the column used to identify rows in the observations and
scores dataframes
:param score_column_name: name of the column with scores in the output dataframe
:param kwargs: other generic arguments, such as 'step' for forward selection
"""

# TODO replace by base class once implemented in Cobra
self.model: Optional[Union[LinearRegressionModel, LogisticRegressionModel]] = None
self.preprocessor: Optional[PreProcessor] = None

self.id_column_name = id_column_name
self.score_column_name = score_column_name
self.model_serialization_type = model_serialization_type
self.continuous_vars = continuous_vars
self.discrete_vars = discrete_vars

self.load_pipeline(pipeline_path)
self.load_model(model_path=model_path, **kwargs)

@classmethod
def deserialize_model(cls, model_dict: dict)\
-> Union[LinearRegressionModel, LogisticRegressionModel]:
"""
Method to deserialize a Cobra model based on a json file
Fails if the json file does not contain a key meta with a valid model type
TODO build as part as Cobra's model base class deserialize
TODO replace return type when base class is created
:param model_dict: dictionary representing the serialized model
:return:
"""
# TODO build as part as Cobra's model base class deserialize
# dictionary of (meta attribute in json file), (class for that description)
MODEL_META = {
"linear-regression": LinearRegressionModel,
"logistic-regression": LogisticRegressionModel
}

model_cls = MODEL_META.get(model_dict["meta"])

model = model_cls()
model.deserialize(model_dict)

return model

def load_pipeline(self, pipeline_path: str):
"""
Method to load a pipeline into the preprocessor attribute
:param pipeline_path: Path to a json file pre processing pipeline serialized as
Supports locations supported by smart_open
:return:
"""
with open(pipeline_path) as pipeline_file:
processing_pipeline = json.load(pipeline_file)
self.preprocessor = PreProcessor.from_pipeline(processing_pipeline)

def load_model(self, model_path: str, **kwargs):
"""
Load a Cobra model from a json file or from a pickle file

If the stored file represents an instance of ForwardFeatureSelection, then 'step' must be
provided
:param model_path:
:param kwargs:
:return: nothing. Loaded model is stored in the model attribute.
"""
model = None
if self.model_serialization_type == CobraModelSerializationType.JSON:
with open(model_path, "r") as model_file:
model_dict = json.load(model_file)
model = self.deserialize_model(model_dict)

elif self.model_serialization_type == CobraModelSerializationType.PICKLE:
with open(model_path, "rb") as model_file:
model = pickle.load(model_file)
else:
raise ValueError(f"Invalid CobraModelSerializationType: {self.model_serialization_type}")

if isinstance(model, ForwardFeatureSelection):
step = kwargs.get('step')
model = model.get_model_from_step(step)

self.model = model

def score(self, observations: DataFrame) -> DataFrame:
"""
Method to score a set of observations which have not been processed by a Cobra PreProcessor
yet
:param observations: dataframe with observations
:return: dataframe with scores, with columns self.id_column_name and self.score_column_name
"""
pre_processed_obs = self.preprocessor.transform(observations,
continuous_vars=self.continuous_vars,
discrete_vars=self.discrete_vars)

return self._do_score(pre_processed_obs)

def _do_score(self, pre_processed_obs: DataFrame) -> DataFrame:
"""
Internal method to score a dataframe containing observations which have been processed
by a Cobra PreProcessor
precondition id_column_name must exist in the pre_processed_obs
:param pre_processed_obs:
:return: dataframe with scores, with columns self.id_column_name and self.score_column_name
"""
scores = pre_processed_obs[[self.id_column_name]].copy()
scores[self.score_column_name] = self.model.score_model(pre_processed_obs)

return scores
Empty file.
94 changes: 94 additions & 0 deletions cobra/industrialization/integrations/airflow/ABTTablesConfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import os
from typing import Dict, List


class ABTTablesConfig:
"""
Class to abstract the configuration and naming convention of tables and
datasets in ABT
"""

def __init__(self):
self.dataset_slg_sds = os.getenv("ABT_DATASET_SLG_SDS", "slg_sds_dev")
self.dataset_slg_sds_hist = os.getenv("ABT_DATASET_SLG_SDS_HIST", "slg_sds_hist_dev")
self.input_dataset = os.getenv("ABT_DATASET_INPUT", "input_dev")
self.dataset_intermediate = os.getenv("ABT_DATASET_INTERMEDIATE", "intermediate_dev")
self.dataset_output = os.getenv("ABT_DATASET_OUTPUT", "output_dev")
self.source_project = os.getenv("ABT_SOURCE_PROJECT", "analytical-base-tables-staging")
self.target_project = os.getenv("ABT_TARGET_PROJECT", "analytical-base-tables-staging")
self.bucket_backups = os.getenv("ABT_BUCKET_BACKUPS", "abt_backups_dev")
self.location = os.getenv("ABT_LOCATION", "eu")
self.run_date_format = "%Y%m%d"

def get_source_tables(self) -> List[Dict[str,str]]:
return [
{"dataset": self.dataset_slg_sds, "table": "communication_stats"},
{"dataset": self.dataset_slg_sds, "table": "contact_moments"},
{"dataset": self.dataset_slg_sds, "table": "site_tag_product_hits"},
{"dataset": self.dataset_slg_sds, "table": "users_maxeda"},
{"dataset": self.dataset_slg_sds_hist, "table": "COMMUNICATIONDOMAIN"},
{"dataset": self.dataset_slg_sds_hist, "table": "COMMUNICATIONS"},
{"dataset": self.dataset_slg_sds_hist, "table": "CONTACTS_TAX_EMAIL"},
{"dataset": self.dataset_slg_sds_hist, "table": "CONTACTS_TAX_TRANS"},
{"dataset": self.dataset_slg_sds_hist, "table": "FAV_STORE"},
{"dataset": self.dataset_slg_sds_hist, "table": "INTERACTIONS"},
{"dataset": self.dataset_slg_sds_hist, "table": "MAILCLIENTCODES"},
{"dataset": self.dataset_slg_sds_hist, "table": "MESSAGE"},
{"dataset": self.dataset_slg_sds_hist, "table": "MESSAGEDELIVERYSTATES"},
{"dataset": self.dataset_slg_sds_hist, "table": "PROBECODES"},
{"dataset": self.dataset_slg_sds_hist, "table": "PRODUCTS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_COMMUNICATIONDOMAIN"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_COMMUNICATIONS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_DATA_CONSENT"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_FAV_STORE"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_INTERACTIONS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_LOYALTYCARDS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_MAILCLIENTCODES"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_MESSAGE"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_MESSAGEDELIVERYSTATES"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_MOBILE_BRICO"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_MOBILE_PRAXIS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_PROBECODES"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_PRODUCTS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_RFM_BRICO"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_RFM_PRAXIS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_SHOPS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_SITETAGPRODUCT"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_SUBJECTRIGHTS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_TRANSACTIONLINES"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_TRANSACTIONS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RAW_USERS_CONTACTS"},
{"dataset": self.dataset_slg_sds_hist, "table": "RFM_BRICO"},
{"dataset": self.dataset_slg_sds_hist, "table": "RFM_PRAXIS"},
{"dataset": self.dataset_slg_sds_hist, "table": "SHOPS"},
{"dataset": self.dataset_slg_sds_hist, "table": "SITETAGPRODUCT"},
{"dataset": self.dataset_slg_sds_hist, "table": "TRANSACTIONS"},
{"dataset": self.dataset_slg_sds_hist, "table": "TRANSACTIONLINES"},
{"dataset": self.dataset_slg_sds_hist, "table": "USERS_CONTACTS"},
{"dataset": self.dataset_slg_sds_hist, "table": "purchase_history"},
{"dataset": self.dataset_slg_sds_hist, "table": "purchase_history_grouped"}

]

def get_target_table_name(self, source_dataset: str, source_table) -> str:
"""
Method to build a target table name based on a source dataset and source table to be used
in the input dataset

:param source_dataset: name of the original source_dataset
:param source_table: name of the table in the original source_dataset
:return: string with source_dataset and source_table
"""
return f"{source_dataset}_{source_table}"

def get_target_table_names(self) -> Dict[str, str]:
"""
Method to retrieve a dictionary with the mappings of all table names in the input dataset

:return: A dictionary mapping a table_{table name} to the name of the table in the
input dataset
"""
return {
f"table_{st['table']}": self.get_target_table_name(st['dataset'], st['table'])
for st in self.get_source_tables()
}
Empty file.
84 changes: 84 additions & 0 deletions cobra/industrialization/integrations/airflow/dag_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import os

from airflow.operators.python import ShortCircuitOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, date, timedelta

DAG_ID_ININITIAL_LOAD = "abt_initial_load"
DAG_ID_INGEST = "abt_ingest_sources"
DAG_ID_CREATE_ABT = "create_abt"
DAG_ID_BACKUP_CLEAN = "backup_and_clean_abt"

AIRFLOW_HOME = os.environ["AIRFLOW_HOME"]
DAGS_FOLDER = os.getenv("DAGS_FOLDER", os.path.join(AIRFLOW_HOME, "dags"))
BQ_SCRIPTS_FOLDER = os.path.join(DAGS_FOLDER, "scripts", "bq")


def create_next_dags(next_dag_id: str, chain_dags: str, dag: DAG, logical_date_ts: str):
"""
Function to create the next dags, based on a chain dags condition. If the condition is set to
the string 'true', then the next dag as in @param next_dag_id will be triggered. Otherwise, it
will not
:param next_dag_id: ID of the next dag to trigger if chain_dag is 'true'
:param chain_dags: 'true' if next dag should be triggered
:param run_date: run date in %Y%m%d or %Y-%m-%d to pass to next dag
:param dag: instance of airflow DAG
:return: a ShortCircuitOperator operator followed by a TriggerDagRunOperator based on chain_dags
and next_dag_id
"""
task_check_if_chained_dags = ShortCircuitOperator(
task_id='check_if_chained_dags',
op_kwargs={"do_run": chain_dags},
python_callable=lambda do_run: do_run.lower() == "true",
dag=dag
)

task_trigger_next_dag = TriggerDagRunOperator(
task_id=f"trigger_{next_dag_id}",
trigger_dag_id=next_dag_id,
dag=dag,
execution_date=logical_date_ts,
wait_for_completion=True, # ensure we don't run this DAG again until the following is done
conf={
"chain_dags": chain_dags
}
)

task_check_if_chained_dags.set_downstream(task_trigger_next_dag)

return task_check_if_chained_dags


def create_auth_task(dag: DAG):
"""
Function to create an airflow task based on a Bash operator.
Based on the env var "NEED_TO_GCLOUD_AUTH" being "true", the task will authenticate via
gcloud auth. Otherwise, it will just echo 'No need to authenticate, skipping login'.
:param dag: instance of airflow DAG
:return: BashOperator with the right bash command to run
"""
need_to_auth = os.getenv("NEED_TO_GCLOUD_AUTH", "False")
if need_to_auth.lower() == "true":
login_command = f"gcloud auth activate-service-account "\
f"{os.getenv('GCP_SERVICE_ACCOUNT_NAME')} "\
f"--key-file={os.getenv('GCP_SERVICE_ACCOUNT_KEY_PATH')}"
else:
login_command = "echo 'No need to authenticate, skipping login'"

task_login = BashOperator(
task_id="gcloud_auth_login",
bash_command=login_command,
dag=dag
)

return task_login


def get_partition_date_str(date_str, date_format="%Y%m%d"):
run_date = datetime.strptime(date_str, date_format)

# Partition date is the monday of the week, in %Y%m%d format
partition_date = (run_date - timedelta(days=run_date.weekday())).strftime(date_format)
return partition_date
2 changes: 2 additions & 0 deletions cobra/industrialization/integrations/airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
airflow
apache-airflow-providers-google==6.7.0
Loading