From cd7ddd969c412c4447639a14ff5c4903a8476012 Mon Sep 17 00:00:00 2001 From: Alexandros Ladas Date: Fri, 5 Dec 2025 18:57:42 +0100 Subject: [PATCH 1/2] Add data preparation orchestration module for weekly sales forecasting - Introduced a new preparation module to create weekly sales forecasting data by store. - Added SQL scripts for creating and managing weekly stores and target views. - Implemented data quality checks and statistics calculations for the prepared data. - Enhanced ingestion script to integrate with the new preparation functionality. - Created unit tests for validation and orchestration of the data preparation pipeline. --- integration/snowflake/data/__init__.py | 10 + integration/snowflake/data/preparation.py | 460 ++++++++++++++++++ .../data/sql/preparation/analyze_stores.sql | 13 + .../data/sql/preparation/calculate_target.sql | 39 ++ .../sql/preparation/create_weekly_stores.sql | 67 +++ .../sql/preparation/data_quality_check.sql | 7 + .../sql/preparation/drop_weekly_stores.sql | 2 + .../sql/preparation/overall_statistics.sql | 8 + .../sql/preparation/snapshots_by_store.sql | 11 + .../data/sql/preparation/store_statistics.sql | 12 + .../sql/preparation/weekly_stores_summary.sql | 27 + .../preparation/zero_sales_investigation.sql | 6 + ...prepare_jaffle_shop_data_for_snowflake.py} | 21 +- .../tests/integration/test_pipeline.py | 144 ++++++ .../snowflake/tests/test_preparation.py | 67 +++ .../snowflake/tests/test_sql_loader.py | 25 + 16 files changed, 913 insertions(+), 6 deletions(-) create mode 100644 integration/snowflake/data/preparation.py create mode 100644 integration/snowflake/data/sql/preparation/analyze_stores.sql create mode 100644 integration/snowflake/data/sql/preparation/calculate_target.sql create mode 100644 integration/snowflake/data/sql/preparation/create_weekly_stores.sql create mode 100644 integration/snowflake/data/sql/preparation/data_quality_check.sql create mode 100644 integration/snowflake/data/sql/preparation/drop_weekly_stores.sql create mode 100644 integration/snowflake/data/sql/preparation/overall_statistics.sql create mode 100644 integration/snowflake/data/sql/preparation/snapshots_by_store.sql create mode 100644 integration/snowflake/data/sql/preparation/store_statistics.sql create mode 100644 integration/snowflake/data/sql/preparation/weekly_stores_summary.sql create mode 100644 integration/snowflake/data/sql/preparation/zero_sales_investigation.sql rename integration/snowflake/{ingest_jaffle_shop_data.py => prepare_jaffle_shop_data_for_snowflake.py} (59%) mode change 100644 => 100755 create mode 100644 integration/snowflake/tests/test_preparation.py diff --git a/integration/snowflake/data/__init__.py b/integration/snowflake/data/__init__.py index 7238cc1..05ccc31 100644 --- a/integration/snowflake/data/__init__.py +++ b/integration/snowflake/data/__init__.py @@ -8,12 +8,14 @@ SnowflakeSettings, create_session, ingestion, + preparation, ) settings = SnowflakeSettings.from_env() with create_session(settings) as session: ingestion.load_from_gcs(session, settings=settings) + preparation.create_weekly_sales_by_store_with_target(session, settings=settings) """ from snowflake.snowpark.exceptions import SnowparkSessionException @@ -32,15 +34,23 @@ load_from_gcs, load_from_s3, ) +from .preparation import ( + DEFAULT_POPULATION_TABLE_NAME, + DataPreparationError, + create_weekly_sales_by_store_with_target, +) __all__ = [ "DEFAULT_GCS_BUCKET", + "DEFAULT_POPULATION_TABLE_NAME", "JAFFLE_SHOP_TABLE_NAMES", "BootstrapError", "DataIngestionError", + "DataPreparationError", "SnowflakeSettings", "SnowparkSessionException", "create_session", + "create_weekly_sales_by_store_with_target", "ensure_infrastructure", "load_from_gcs", "load_from_s3", diff --git a/integration/snowflake/data/preparation.py b/integration/snowflake/data/preparation.py new file mode 100644 index 0000000..ad02cd2 --- /dev/null +++ b/integration/snowflake/data/preparation.py @@ -0,0 +1,460 @@ +"""Prepare weekly sales forecasting data for getML - BY STORE. + +This module creates: +- weekly_stores table: Store-week combinations with reference_date (Monday week start) +- Population view with target (next week's sales) + +reference_date is the Monday (week start) derived from DATE_TRUNC('week', ordered_at). + +When settings are provided, the module auto-bootstraps required infrastructure +(warehouse and database) if they don't exist. + +SQL queries are externalized in the sql/ directory for better maintainability. + +Usage example: + from data import ( + SnowflakeSettings, + create_session, + create_weekly_sales_by_store_with_target, + ) + + settings = SnowflakeSettings.from_env() + with create_session(settings) as session: + # Auto-bootstraps warehouse + database when settings are provided + population_table = create_weekly_sales_by_store_with_target( + session, + settings=settings, + source_schema="RAW", + target_schema="PREPARED", + ) + arrow_table = session.table(population_table).to_arrow() +""" + +# ruff: noqa: G004, G003, S608, TRY301, E501 + +import logging +from typing import cast + +from snowflake.snowpark import Row, Session + +from ._bootstrap import ensure_infrastructure +from ._settings import SnowflakeSettings +from ._sql_loader import load_sql + +logger: logging.Logger = logging.getLogger(__name__) + + +class DataPreparationError(Exception): + """Raised when data preparation fails.""" + + +# ============================================================================= +# Public API +# ============================================================================= + + +DEFAULT_POPULATION_TABLE_NAME = "WEEKLY_SALES_BY_STORE_WITH_TARGET" + + +def create_weekly_sales_by_store_with_target( + session: Session, + settings: SnowflakeSettings | None = None, + source_schema: str = "RAW", + target_schema: str = "PREPARED", + table_name: str = DEFAULT_POPULATION_TABLE_NAME, +) -> str: + """Create weekly sales forecasting data for getML, grouped by store. + + Creates: + - weekly_stores: Table with store-week combinations (reference_date = Monday) + - Population view with target column (configurable name via table_name) + - Target: Sum of order_total for the 7-day window starting at reference_date + + When settings are provided, auto-bootstraps the warehouse and database + if they don't exist. + + Args: + session: Active Snowflake Snowpark session. + settings: Optional settings for auto-bootstrapping warehouse and database. + When provided, ensures infrastructure exists before preparing data. + source_schema: Schema containing raw_stores and raw_orders tables. + target_schema: Schema where prepared tables/views will be created. + table_name: Name of the population view to create. + + Returns: + Fully qualified table name (e.g., "PREPARED.WEEKLY_SALES_BY_STORE_WITH_TARGET"). + + Raises: + DataPreparationError: If source tables are missing or preparation fails. + BootstrapError: If auto-bootstrap fails due to insufficient privileges. + """ + if settings is not None: + ensure_infrastructure(session, settings) + if settings.warehouse: + session.use_warehouse(settings.warehouse) + if settings.database: + session.use_database(settings.database) + + _validate_source_tables(session, source_schema) + _ensure_target_schema(session, target_schema) + + logger.info(""" +================================================================================ +PREPARING WEEKLY SALES FORECASTING DATA BY STORE FOR GETML +================================================================================ +""") + + _analyze_and_display_stores(session, source_schema) + per_store = _create_weekly_stores_table(session, source_schema, target_schema) + _create_target_view(session, source_schema, target_schema, table_name) + + _display_sample_data(session, target_schema, per_store, table_name) + _display_store_statistics(session, target_schema, table_name) + _display_overall_statistics(session, target_schema, table_name) + _perform_data_quality_check(session, target_schema, table_name) + _display_recent_snapshots(session, target_schema, per_store, table_name) + + qualified_table_name = f"{target_schema}.{table_name}" + + logger.info(f""" +================================================================================ +DATA PREPARATION COMPLETE! +================================================================================ + +Objects created in {target_schema} schema: + - weekly_stores (TABLE) + - {table_name} (VIEW - USE THIS FOR GETML) + +Population table: {qualified_table_name} +For getML integration instructions, see: docs/GETML_WEEKLY_SALES_DATA_PREPARATION.md +""") + + return qualified_table_name + + +# ============================================================================= +# Validation and Schema Setup +# ============================================================================= + + +def _validate_source_tables(session: Session, source_schema: str) -> None: + """Validate that source schema exists and contains required tables.""" + required_tables: list[str] = ["raw_stores", "raw_orders"] + logger.info(f"Validating {source_schema} schema and required tables...") + + try: + for table_name in required_tables: + result: list[Row] = session.sql( + f"SELECT COUNT(*) as count FROM {source_schema}.{table_name} LIMIT 1" + ).collect() + if not result: + raise DataPreparationError( + f"Table {source_schema}.{table_name} does not exist or is not " + "accessible. Please run ingestion first." + ) + except DataPreparationError: + raise + except Exception as e: + raise DataPreparationError( + f"Failed to validate {source_schema} schema. " + f"Ensure data has been loaded. Error: {e}" + ) from e + + logger.info(f"✓ {source_schema} schema validated") + + +def _ensure_target_schema(session: Session, target_schema: str) -> None: + """Create target schema if it doesn't exist.""" + logger.info(f"Creating {target_schema} schema if not exists...") + sql: str = load_sql("common/create_schema.sql", schema_name=target_schema) + _ = session.sql(query=sql).collect() + logger.info(f"✓ {target_schema} schema ready") + + +# ============================================================================= +# Data Preparation Pipeline +# ============================================================================= + + +def _analyze_and_display_stores(session: Session, source_schema: str) -> None: + """Analyze and display store activity.""" + logger.info("\n1. Analyzing store data...") + stores: list[Row] = session.sql( + query=load_sql( + path="preparation/analyze_stores.sql", source_schema=source_schema + ) + ).collect() + + logger.info(f"\n{'Store':<30} {'Opened':<20} {'Orders':<12} {'Total Sales':<15}") + logger.info("-" * 80) + for store in stores: + logger.info( + f"{store[1]:<30} {store[2]!s:<20} {store[5]:<12,} ${store[6]:<14,.2f}" + ) + + +def _create_weekly_stores_table( + session: Session, + source_schema: str, + target_schema: str, +) -> list[Row]: + """Create weekly_stores table with store-week combinations. + + Creates one row per store per week using reference_date (Monday week start). + + Returns: + List of store information rows. + """ + logger.info("\n2. Creating weekly_stores table (store-week combinations)...") + logger.info(" reference_date is Monday (week start) from DATE_TRUNC('week', ...)") + + _ = session.sql( + load_sql(path="preparation/drop_weekly_stores.sql", target_schema=target_schema) + ).collect() + _ = session.sql( + query=load_sql( + path="preparation/create_weekly_stores.sql", + source_schema=source_schema, + target_schema=target_schema, + ) + ).collect() + + # Get summary in single query: totals + per-store breakdown + summary: list[Row] = session.sql( + query=load_sql( + path="preparation/weekly_stores_summary.sql", + target_schema=target_schema, + ) + ).collect() + + # First row contains totals (same for all rows due to CROSS JOIN) + num_snapshots: int = cast("int", summary[0][0]) + num_stores: int = cast("int", summary[0][1]) + logger.info( + f" Created {num_snapshots:,} weekly snapshots across {num_stores} stores" + ) + + # Extract per-store data (columns 2-5: store_name, num_snapshots, first, last) + per_store: list[Row] = summary + + logger.info(f"\n {'Store':<30} {'Snapshots':<12} {'First':<20} {'Last':<20}") + logger.info(" " + "-" * 80) + for row in per_store: + logger.info(f" {row[2]:<30} {row[3]:<12,} {row[4]!s:<20} {row[5]!s:<20}") + + return per_store + + +def _create_target_view( + session: Session, + source_schema: str, + target_schema: str, + table_name: str, +) -> None: + """Create view with target - total sales for the following week per store.""" + logger.info("\n3. Creating target view: next week's total sales per store...") + _ = session.sql( + query=load_sql( + path="preparation/calculate_target.sql", + source_schema=source_schema, + target_schema=target_schema, + table_name=table_name, + ) + ).collect() + + +# ============================================================================= +# Display and Analysis Functions +# ============================================================================= + + +def _display_sample_data( + session: Session, + target_schema: str, + per_store: list[Row], + table_name: str, +) -> None: + """Show sample data for first few stores.""" + if not logger.isEnabledFor(logging.DEBUG): + return + + logger.debug("\n4. Sample data - first 5 snapshots per store:") + logger.debug("-" * 120) + + for store_info in per_store[:3]: + store_name: str = cast("str", store_info[2]) + logger.debug(f"\n{store_name}:") + sample_query: str = load_sql( + path="preparation/snapshots_by_store.sql", + target_schema=target_schema, + store_name=store_name, + table_name=table_name, + order_direction="", + limit="5", + ) + samples: list[Row] = session.sql(sample_query).collect() + + logger.debug(f"{'ID':<8} {'Week Start':<20} {'Sales $':<15} {'Orders':<10}") + logger.debug("-" * 60) + for row in samples: + logger.debug(f"{row[0]:<8} {row[1]!s:<20} ${row[2]:<14,.2f} {row[3]:<10,}") + + +def _display_store_statistics( + session: Session, + target_schema: str, + table_name: str, +) -> None: + """Display target statistics per store.""" + if not logger.isEnabledFor(logging.DEBUG): + return + + logger.debug("\n5. Target statistics per store:") + logger.debug("-" * 120) + + store_stats: list[Row] = session.sql( + query=load_sql( + path="preparation/store_statistics.sql", + target_schema=target_schema, + table_name=table_name, + ) + ).collect() + + logger.debug( + f"{'Store':<30} {'Snapshots':<12} {'Avg Week $':<15} " + f"{'Min $':<15} {'Max $':<15} {'Std Dev $':<15}" + ) + logger.debug("-" * 120) + for row in store_stats: + logger.debug( + f"{row[0]:<30} {row[1]:<12,} ${row[2]:<14,.2f} " + f"${row[3]:<14,.2f} ${row[4]:<14,.2f} ${row[5]:<14,.2f}" + ) + + +def _display_overall_statistics( + session: Session, + target_schema: str, + table_name: str, +) -> None: + """Display overall dataset statistics.""" + if not logger.isEnabledFor(logging.DEBUG): + return + + overall: Row = session.sql( + query=load_sql( + path="preparation/overall_statistics.sql", + target_schema=target_schema, + table_name=table_name, + ) + ).collect()[0] + + logger.debug(f""" +6. Overall statistics: +-------------------------------------------------------------------------------- + Total snapshots: {overall[0]:,} + Number of stores: {overall[1]:,} + Average weekly sales per store: ${overall[2]:,.2f} + Total sales in dataset: ${overall[3]:,.2f} + Total orders in dataset: {overall[4]:,} +""") + + +def _perform_data_quality_check( + session: Session, + target_schema: str, + table_name: str, +) -> None: + """Perform and display data quality checks.""" + if not logger.isEnabledFor(logging.DEBUG): + return + + quality: Row = session.sql( + query=load_sql( + path="preparation/data_quality_check.sql", + target_schema=target_schema, + table_name=table_name, + ) + ).collect()[0] + + logger.debug(f""" +7. Data quality check: +-------------------------------------------------------------------------------- + Total snapshots: {quality[0]:,} + Weeks with zero sales: {quality[1]:,} + Weeks with zero orders: {quality[2]:,} + Invalid records (sales but no orders): {quality[3]:,} +""") + + zero_sales_count: int = cast("int", quality[1]) + if zero_sales_count > 0: + logger.warning( + f"\n WARNING: {zero_sales_count} snapshots have zero sales. Investigating..." + ) + zero_sales: list[Row] = session.sql( + query=load_sql( + path="preparation/zero_sales_investigation.sql", + target_schema=target_schema, + table_name=table_name, + ) + ).collect() + for row in zero_sales: + logger.warning(f" {row[0]}: {row[1]} weeks with zero sales") + + +def _display_recent_snapshots( + session: Session, + target_schema: str, + per_store: list[Row], + table_name: str, +) -> None: + """Show most recent snapshots for first few stores.""" + if not logger.isEnabledFor(logging.DEBUG): + return + + logger.debug("\n8. Most recent snapshots (last 3 per store):") + logger.debug("-" * 120) + + for store_info in per_store[:3]: + store_name: str = cast("str", store_info[2]) + logger.debug(f"\n{store_name}:") + recent_query: str = load_sql( + path="preparation/snapshots_by_store.sql", + target_schema=target_schema, + store_name=store_name, + table_name=table_name, + order_direction="DESC", + limit="3", + ) + recent: list[Row] = session.sql(recent_query).collect() + + logger.debug(f"{'ID':<8} {'Week Start':<20} {'Sales $':<15} {'Orders':<10}") + logger.debug("-" * 60) + for row in recent: + logger.debug(f"{row[0]:<8} {row[1]!s:<20} ${row[2]:<14,.2f} {row[3]:<10,}") + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(message)s", + ) + + logger.info("This module requires a Snowflake session to be passed.") + logger.info("Example usage:") + logger.info(""" + from data import ( + SnowflakeSettings, + create_session, + create_weekly_sales_by_store_with_target, + ) + + settings = SnowflakeSettings.from_env() + + with create_session(settings) as session: + population_table = create_weekly_sales_by_store_with_target( + session, + table_name="WEEKLY_SALES_BY_STORE_WITH_TARGET", + ) + # Use population_table with session.table(population_table).to_arrow() + """) diff --git a/integration/snowflake/data/sql/preparation/analyze_stores.sql b/integration/snowflake/data/sql/preparation/analyze_stores.sql new file mode 100644 index 0000000..03ab648 --- /dev/null +++ b/integration/snowflake/data/sql/preparation/analyze_stores.sql @@ -0,0 +1,13 @@ +-- Analyze stores and their activity +SELECT + s.id, + s.name, + TRY_TO_TIMESTAMP(s.opened_at) as opened_at, + MIN(TRY_TO_TIMESTAMP(o.ordered_at)) as first_order, + MAX(TRY_TO_TIMESTAMP(o.ordered_at)) as last_order, + COUNT(o.id) as total_orders, + SUM(COALESCE(o.order_total, 0)) / 100.0 as total_sales +FROM {source_schema}.raw_stores s +LEFT JOIN {source_schema}.raw_orders o ON o.store_id = s.id +GROUP BY s.id, s.name, s.opened_at +ORDER BY TRY_TO_TIMESTAMP(s.opened_at) diff --git a/integration/snowflake/data/sql/preparation/calculate_target.sql b/integration/snowflake/data/sql/preparation/calculate_target.sql new file mode 100644 index 0000000..b7ac3b5 --- /dev/null +++ b/integration/snowflake/data/sql/preparation/calculate_target.sql @@ -0,0 +1,39 @@ +-- Create view with target: next week's sales per store +-- +-- This view joins weekly_stores with pre-aggregated order totals. +-- Target is the sum of order_total for the 7-day window starting at reference_date. +-- +-- Window: [reference_date, reference_date + 7 days) +-- - reference_date is Monday 00:00:00 (week start) +-- - Target covers Monday through Sunday of that week +-- +-- Optimized: Single aggregation pass instead of correlated subqueries +CREATE OR REPLACE VIEW {target_schema}.{table_name} AS +WITH weekly_order_totals AS ( + SELECT + store_id, + DATE_TRUNC('week', TRY_TO_TIMESTAMP(ordered_at)) as week_start, + SUM(order_total) / 100.0 as week_sales, + COUNT(*) as week_orders + FROM {source_schema}.raw_orders + WHERE ordered_at IS NOT NULL + GROUP BY store_id, DATE_TRUNC('week', TRY_TO_TIMESTAMP(ordered_at)) +) +SELECT + ws.snapshot_id, + ws.store_id, + ws.store_name, + ws.reference_date, + ws.year, + ws.month, + ws.week_number, + ws.days_since_open, + ws.is_full_week_after_opening, + ws.has_order_activity, + ws.has_min_history, + COALESCE(wot.week_sales, 0) as next_week_sales, + COALESCE(wot.week_orders, 0) as next_week_orders +FROM {target_schema}.weekly_stores ws +LEFT JOIN weekly_order_totals wot + ON wot.store_id = ws.store_id + AND wot.week_start = ws.reference_date diff --git a/integration/snowflake/data/sql/preparation/create_weekly_stores.sql b/integration/snowflake/data/sql/preparation/create_weekly_stores.sql new file mode 100644 index 0000000..af3f2ef --- /dev/null +++ b/integration/snowflake/data/sql/preparation/create_weekly_stores.sql @@ -0,0 +1,67 @@ +-- Create store-week combinations for weekly sales forecasting +-- +-- This table creates the base data for getML: one row per store per week. +-- reference_date is the Monday (week start) derived from DATE_TRUNC('week', ordered_at). +-- +-- Filtering logic: +-- - Week must be >= store's opened_at date (store existed) +-- - Week must be < last_order_week (exclude incomplete final week) +-- +-- Boolean flags for data quality filtering: +-- - is_full_week_after_opening: Store had a full week of operation before this week +-- - has_order_activity: Store has order data spanning this week +-- - has_min_history: At least 7 days since store opened +CREATE TABLE {target_schema}.weekly_stores AS +WITH store_activity AS ( + SELECT + s.id as store_id, + s.name as store_name, + TRY_TO_TIMESTAMP(s.opened_at) as opened_at, + DATE_TRUNC('week', TRY_TO_TIMESTAMP(s.opened_at)) + INTERVAL '7 days' as first_full_week, + MIN(TRY_TO_TIMESTAMP(o.ordered_at)) as first_order_date, + MAX(TRY_TO_TIMESTAMP(o.ordered_at)) as last_order_date, + DATE_TRUNC('week', MIN(TRY_TO_TIMESTAMP(o.ordered_at))) as first_order_week, + DATE_TRUNC('week', MAX(TRY_TO_TIMESTAMP(o.ordered_at))) as last_order_week + FROM {source_schema}.raw_stores s + LEFT JOIN {source_schema}.raw_orders o ON o.store_id = s.id + GROUP BY s.id, s.name, TRY_TO_TIMESTAMP(s.opened_at) +), + +all_weeks AS ( + SELECT DISTINCT + DATE_TRUNC('week', TRY_TO_TIMESTAMP(ordered_at)) as reference_date + FROM {source_schema}.raw_orders + WHERE ordered_at IS NOT NULL +), + +store_weeks AS ( + SELECT + sa.store_id, + sa.store_name, + w.reference_date, + sa.opened_at, + sa.first_full_week, + sa.first_order_week, + sa.last_order_week + FROM store_activity sa + CROSS JOIN all_weeks w + WHERE w.reference_date >= sa.opened_at + AND w.reference_date < sa.last_order_week +) + +SELECT + ROW_NUMBER() OVER (ORDER BY reference_date, store_id) as snapshot_id, + store_id, + store_name, + reference_date, + EXTRACT(year FROM reference_date) as year, + EXTRACT(month FROM reference_date) as month, + EXTRACT(week FROM reference_date) as week_number, + DATEDIFF('day', opened_at, reference_date) as days_since_open, + reference_date >= first_full_week as is_full_week_after_opening, + first_order_week IS NOT NULL + AND reference_date >= first_order_week + AND reference_date < last_order_week as has_order_activity, + DATEDIFF('day', opened_at, reference_date) >= 7 as has_min_history +FROM store_weeks +ORDER BY reference_date, store_id; diff --git a/integration/snowflake/data/sql/preparation/data_quality_check.sql b/integration/snowflake/data/sql/preparation/data_quality_check.sql new file mode 100644 index 0000000..bce0155 --- /dev/null +++ b/integration/snowflake/data/sql/preparation/data_quality_check.sql @@ -0,0 +1,7 @@ +-- Data quality check +SELECT + COUNT(*) as total, + SUM(CASE WHEN next_week_sales = 0 THEN 1 ELSE 0 END) as zero_sales, + SUM(CASE WHEN next_week_orders = 0 THEN 1 ELSE 0 END) as zero_orders, + SUM(CASE WHEN next_week_sales > 0 AND next_week_orders = 0 THEN 1 ELSE 0 END) as invalid +FROM {target_schema}.{table_name} diff --git a/integration/snowflake/data/sql/preparation/drop_weekly_stores.sql b/integration/snowflake/data/sql/preparation/drop_weekly_stores.sql new file mode 100644 index 0000000..d707af2 --- /dev/null +++ b/integration/snowflake/data/sql/preparation/drop_weekly_stores.sql @@ -0,0 +1,2 @@ +-- Drop the weekly_stores table if it exists +DROP TABLE IF EXISTS {target_schema}.weekly_stores diff --git a/integration/snowflake/data/sql/preparation/overall_statistics.sql b/integration/snowflake/data/sql/preparation/overall_statistics.sql new file mode 100644 index 0000000..0772c2d --- /dev/null +++ b/integration/snowflake/data/sql/preparation/overall_statistics.sql @@ -0,0 +1,8 @@ +-- Overall statistics +SELECT + COUNT(*) as total_snapshots, + COUNT(DISTINCT store_id) as num_stores, + AVG(next_week_sales) as avg_weekly_sales, + SUM(next_week_sales) as total_sales, + SUM(next_week_orders) as total_orders +FROM {target_schema}.{table_name} diff --git a/integration/snowflake/data/sql/preparation/snapshots_by_store.sql b/integration/snowflake/data/sql/preparation/snapshots_by_store.sql new file mode 100644 index 0000000..03b24d7 --- /dev/null +++ b/integration/snowflake/data/sql/preparation/snapshots_by_store.sql @@ -0,0 +1,11 @@ +-- Sample snapshots for a specific store +-- Use order_direction="" for oldest first, "DESC" for newest first +SELECT + snapshot_id, + reference_date, + next_week_sales, + next_week_orders +FROM {target_schema}.{table_name} +WHERE store_name = '{store_name}' +ORDER BY reference_date {order_direction} +LIMIT {limit} diff --git a/integration/snowflake/data/sql/preparation/store_statistics.sql b/integration/snowflake/data/sql/preparation/store_statistics.sql new file mode 100644 index 0000000..6bdeb4e --- /dev/null +++ b/integration/snowflake/data/sql/preparation/store_statistics.sql @@ -0,0 +1,12 @@ +-- Statistics per store +SELECT + store_name, + COUNT(*) as num_snapshots, + AVG(next_week_sales) as avg_weekly_sales, + MIN(next_week_sales) as min_weekly_sales, + MAX(next_week_sales) as max_weekly_sales, + STDDEV(next_week_sales) as stddev_weekly_sales, + SUM(next_week_orders) as total_orders +FROM {target_schema}.{table_name} +GROUP BY store_name +ORDER BY avg_weekly_sales DESC diff --git a/integration/snowflake/data/sql/preparation/weekly_stores_summary.sql b/integration/snowflake/data/sql/preparation/weekly_stores_summary.sql new file mode 100644 index 0000000..9cd38b5 --- /dev/null +++ b/integration/snowflake/data/sql/preparation/weekly_stores_summary.sql @@ -0,0 +1,27 @@ +-- Summary of weekly_stores table: counts and per-store breakdown +-- Returns total snapshots, store count, and per-store details in one query +WITH totals AS ( + SELECT + COUNT(*) as total_snapshots, + COUNT(DISTINCT store_id) as num_stores + FROM {target_schema}.weekly_stores +), +per_store AS ( + SELECT + store_name, + COUNT(*) as num_snapshots, + MIN(reference_date) as first_snapshot, + MAX(reference_date) as last_snapshot + FROM {target_schema}.weekly_stores + GROUP BY store_name +) +SELECT + t.total_snapshots, + t.num_stores, + p.store_name, + p.num_snapshots, + p.first_snapshot, + p.last_snapshot +FROM totals t +CROSS JOIN per_store p +ORDER BY p.first_snapshot diff --git a/integration/snowflake/data/sql/preparation/zero_sales_investigation.sql b/integration/snowflake/data/sql/preparation/zero_sales_investigation.sql new file mode 100644 index 0000000..7a03bb7 --- /dev/null +++ b/integration/snowflake/data/sql/preparation/zero_sales_investigation.sql @@ -0,0 +1,6 @@ +-- Investigate stores with zero sales +SELECT store_name, COUNT(*) as count +FROM {target_schema}.{table_name} +WHERE next_week_sales = 0 +GROUP BY store_name +ORDER BY count DESC diff --git a/integration/snowflake/ingest_jaffle_shop_data.py b/integration/snowflake/prepare_jaffle_shop_data_for_snowflake.py old mode 100644 new mode 100755 similarity index 59% rename from integration/snowflake/ingest_jaffle_shop_data.py rename to integration/snowflake/prepare_jaffle_shop_data_for_snowflake.py index 49c766c..0f65abf --- a/integration/snowflake/ingest_jaffle_shop_data.py +++ b/integration/snowflake/prepare_jaffle_shop_data_for_snowflake.py @@ -1,15 +1,15 @@ -"""Ingest Jaffle Shop data into Snowflake for getML Feature Store integration. +#!/usr/bin/env python3 +"""Prepare Jaffle Shop data in Snowflake for getML Feature Store integration. -Loads raw Jaffle Shop dataset (Parquet files) from a public GCS bucket into the -RAW schema in Snowflake. Infrastructure (warehouse, database) is auto-created -if missing. +Loads raw data from GCS and creates the weekly sales forecasting population table. +Infrastructure (warehouse, database) is auto-created if missing. Prerequisites: - Snowflake account with appropriate privileges - SNOWFLAKE_* environment variables set for authentication and configuration Usage: - uv run python ingest_jaffle_shop_data.py + uv run python prepare_jaffle_shop_data_for_snowflake.py """ import logging @@ -19,13 +19,14 @@ SnowflakeSettings, create_session, ingestion, + preparation, ) logger: logging.Logger = logging.getLogger(__name__) def main() -> None: - """Ingest Jaffle Shop raw data into Snowflake. + """Load and prepare Jaffle Shop data for getML. Note: Set basicConfig.level to logging.DEBUG for more verbose output. @@ -46,6 +47,14 @@ def main() -> None: destination_schema="RAW", ) + _ = preparation.create_weekly_sales_by_store_with_target( + session, + settings=settings, + source_schema="RAW", + target_schema="PREPARED", + table_name="WEEKLY_SALES_BY_STORE_WITH_TARGET", + ) + if __name__ == "__main__": main() diff --git a/integration/snowflake/tests/integration/test_pipeline.py b/integration/snowflake/tests/integration/test_pipeline.py index ab2f9e8..96ded8a 100644 --- a/integration/snowflake/tests/integration/test_pipeline.py +++ b/integration/snowflake/tests/integration/test_pipeline.py @@ -10,12 +10,15 @@ - GCS_STORAGE_INTEGRATION: Name of the storage integration for GCS access """ +# ruff: noqa: E501 + import logging import pytest from snowflake.snowpark import Row, Session from data.ingestion import JAFFLE_SHOP_TABLE_NAMES +from data.preparation import create_weekly_sales_by_store_with_target from .conftest import StorageConfig, load_jaffle_shop_data @@ -71,3 +74,144 @@ def test_tables_have_expected_columns( ] for col in expected_columns: assert col in column_names, f"Expected column {col} not found" + + +class TestDataPreparation: + """Integration tests for data preparation pipeline.""" + + @pytest.mark.dependency(name="data_preparation", depends=["data_ingestion"]) + def test_prepare_weekly_sales_creates_weekly_stores_table( + self, + snowflake_session: Session, + ) -> None: + """Verify preparation creates weekly_stores table with data.""" + _ = create_weekly_sales_by_store_with_target(snowflake_session) + + # Verify table was created with data + result: list[Row] = snowflake_session.sql(""" + SELECT COUNT(*) as count FROM PREPARED.weekly_stores + """).collect() + + row_count = int(result[0]["COUNT"]) # pyright: ignore[reportArgumentType] + assert row_count > 0, "weekly_stores table should have data" + + @pytest.mark.dependency(depends=["data_preparation"]) + def test_weekly_stores_has_multiple_stores( + self, + snowflake_session: Session, + ) -> None: + """Verify weekly_stores table contains data for multiple stores.""" + result: list[Row] = snowflake_session.sql(""" + SELECT COUNT(DISTINCT store_id) as store_count + FROM PREPARED.weekly_stores + """).collect() + + store_count = int(result[0]["STORE_COUNT"]) # pyright: ignore[reportArgumentType] + assert store_count > 1, "weekly_stores should have multiple stores" + + @pytest.mark.dependency(depends=["data_preparation"]) + def test_weekly_stores_has_correct_columns( + self, + snowflake_session: Session, + ) -> None: + """Verify weekly_stores table has the expected column structure.""" + result: list[Row] = snowflake_session.sql(""" + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = 'PREPARED' + AND TABLE_NAME = 'WEEKLY_STORES' + ORDER BY ORDINAL_POSITION + """).collect() + + column_names = [str(row["COLUMN_NAME"]) for row in result] # pyright: ignore[reportUnknownArgumentType] + + expected_columns = [ + "SNAPSHOT_ID", + "STORE_ID", + "STORE_NAME", + "REFERENCE_DATE", + "YEAR", + "MONTH", + "WEEK_NUMBER", + "DAYS_SINCE_OPEN", + "IS_FULL_WEEK_AFTER_OPENING", + "HAS_ORDER_ACTIVITY", + "HAS_MIN_HISTORY", + ] + assert column_names == expected_columns + + @pytest.mark.dependency(depends=["data_preparation"]) + def test_target_view_exists_with_sales_columns( + self, + snowflake_session: Session, + ) -> None: + """Verify weekly_sales_by_store_with_target view exists with target columns.""" + result: list[Row] = snowflake_session.sql(""" + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = 'PREPARED' + AND TABLE_NAME = 'WEEKLY_SALES_BY_STORE_WITH_TARGET' + ORDER BY ORDINAL_POSITION + """).collect() + + column_names = [str(row["COLUMN_NAME"]) for row in result] # pyright: ignore[reportUnknownArgumentType] + + # Should have all weekly_stores columns plus target columns + assert "NEXT_WEEK_SALES" in column_names, "View should have NEXT_WEEK_SALES" + assert "NEXT_WEEK_ORDERS" in column_names, "View should have NEXT_WEEK_ORDERS" + assert "REFERENCE_DATE" in column_names, "View should have REFERENCE_DATE" + + @pytest.mark.dependency(depends=["data_preparation"]) + def test_target_view_has_valid_sales_data( + self, + snowflake_session: Session, + ) -> None: + """Verify the target view calculates sales correctly.""" + result: list[Row] = snowflake_session.sql(""" + SELECT + COUNT(*) as total_rows, + SUM(CASE WHEN next_week_sales > 0 THEN 1 ELSE 0 END) as rows_with_sales, + AVG(next_week_sales) as avg_sales + FROM PREPARED.weekly_sales_by_store_with_target + """).collect() + + total_rows = int(result[0]["TOTAL_ROWS"]) # pyright: ignore[reportArgumentType] + rows_with_sales = int(result[0]["ROWS_WITH_SALES"]) # pyright: ignore[reportArgumentType] + avg_sales = float(result[0]["AVG_SALES"]) # pyright: ignore[reportArgumentType] + + assert total_rows > 0, "View should have rows" + assert rows_with_sales > 0, "Some rows should have sales > 0" + assert avg_sales > 0, "Average sales should be positive" + + @pytest.mark.dependency(depends=["data_preparation"]) + def test_boolean_flags_are_calculated( + self, + snowflake_session: Session, + ) -> None: + """Verify boolean flags are properly calculated in weekly_stores.""" + result: list[Row] = snowflake_session.sql( + query=""" + SELECT + COUNT(*) as total, + SUM(CASE WHEN is_full_week_after_opening THEN 1 ELSE 0 END) as full_week_count, + SUM(CASE WHEN has_order_activity THEN 1 ELSE 0 END) as activity_count, + SUM(CASE WHEN has_min_history THEN 1 ELSE 0 END) as min_history_count + FROM PREPARED.weekly_stores + """ + ).collect() + + total = int(result[0]["TOTAL"]) # pyright: ignore[reportArgumentType] + full_week_count = int(result[0]["FULL_WEEK_COUNT"]) # pyright: ignore[reportArgumentType] + activity_count = int(result[0]["ACTIVITY_COUNT"]) # pyright: ignore[reportArgumentType] + min_history_count = int(result[0]["MIN_HISTORY_COUNT"]) # pyright: ignore[reportArgumentType] + + # All flags should have some TRUE values (not all zeros) + assert full_week_count > 0, ( + "Some rows should have is_full_week_after_opening=TRUE" + ) + assert activity_count > 0, "Some rows should have has_order_activity=TRUE" + assert min_history_count > 0, "Some rows should have has_min_history=TRUE" + + # Not all rows should have all flags TRUE (validates the logic works) + # At minimum, first week after opening shouldn't have full history + assert full_week_count <= total, "Boolean flag counts should not exceed total" diff --git a/integration/snowflake/tests/test_preparation.py b/integration/snowflake/tests/test_preparation.py new file mode 100644 index 0000000..9bf9f9d --- /dev/null +++ b/integration/snowflake/tests/test_preparation.py @@ -0,0 +1,67 @@ +"""Tests for data preparation module. + +Tests validation logic and pipeline orchestration using mocked +Snowflake sessions. +""" + +# pyright: reportAny=false + +from unittest.mock import MagicMock + +import pytest + +from data.preparation import ( + DataPreparationError, + _validate_source_tables, # pyright: ignore[reportPrivateUsage] + create_weekly_sales_by_store_with_target, +) + + +class TestValidateSourceTables: + """Tests for source table validation logic.""" + + def test_raises_error_when_table_query_fails(self, mock_session: MagicMock) -> None: + """Verify DataPreparationError raised when required table doesn't exist.""" + mock_session.sql.return_value.collect.side_effect = Exception( + "Table RAW.raw_stores does not exist" + ) + + with pytest.raises(DataPreparationError, match="Failed to validate"): + _validate_source_tables(mock_session, "RAW") + + def test_raises_error_when_result_is_empty(self, mock_session: MagicMock) -> None: + """Verify DataPreparationError raised when table query returns empty result.""" + mock_session.sql.return_value.collect.return_value = [] + + with pytest.raises( + DataPreparationError, match="does not exist or is not accessible" + ): + _validate_source_tables(mock_session, "RAW") + + def test_succeeds_when_tables_exist(self, mock_session: MagicMock) -> None: + """Verify no error when required tables exist and are accessible.""" + mock_session.sql.return_value.collect.return_value = [{"count": 100}] + + _validate_source_tables(mock_session, "RAW") + + def test_uses_provided_source_schema(self, mock_session: MagicMock) -> None: + """Verify provided source schema is used in validation queries.""" + mock_session.sql.return_value.collect.return_value = [{"count": 100}] + + _validate_source_tables(mock_session, "CUSTOM_SOURCE") + + sql_calls = [c[0][0] for c in mock_session.sql.call_args_list] + assert all("CUSTOM_SOURCE" in c for c in sql_calls) + + +class TestCreateWeeklySalesByStoreWithTarget: + """Tests for the main preparation pipeline.""" + + def test_validates_source_tables_first(self, mock_session: MagicMock) -> None: + """Verify pipeline validates source tables before proceeding.""" + mock_session.sql.return_value.collect.side_effect = Exception( + "Table does not exist" + ) + + with pytest.raises(DataPreparationError, match="Failed to validate"): + _ = create_weekly_sales_by_store_with_target(mock_session) diff --git a/integration/snowflake/tests/test_sql_loader.py b/integration/snowflake/tests/test_sql_loader.py index 464cbc2..28e644c 100644 --- a/integration/snowflake/tests/test_sql_loader.py +++ b/integration/snowflake/tests/test_sql_loader.py @@ -21,6 +21,20 @@ def test_loads_existing_sql_file(self) -> None: assert "CREATE SCHEMA" in content assert "PREPARED" in content + def test_applies_formatting_when_kwargs_provided(self) -> None: + """Verify load_sql applies string formatting with provided kwargs.""" + content: str = load_sql( + path="preparation/snapshots_by_store.sql", + target_schema="PREPARED", + table_name="WEEKLY_SALES", + store_name="Test Store", + order_direction="DESC", + limit="5", + ) + + assert "Test Store" in content + assert "PREPARED.WEEKLY_SALES" in content + def test_returns_raw_content_when_no_kwargs(self) -> None: """Verify load_sql returns unmodified content without kwargs.""" content: str = load_sql( @@ -37,3 +51,14 @@ def test_raises_file_not_found_for_missing_file(self) -> None: """Verify FileNotFoundError propagates for nonexistent SQL files.""" with pytest.raises(FileNotFoundError): _ = load_sql(path="nonexistent/missing.sql") + + def test_preserves_placeholders_when_no_kwargs(self) -> None: + """Verify load_sql preserves {placeholders} when no kwargs provided.""" + # snapshots_by_store.sql has {store_name}, {table_name}, etc. placeholders + content: str = load_sql(path="preparation/snapshots_by_store.sql") + + # Placeholders should remain in the content + assert "{store_name}" in content + assert "{table_name}" in content + assert "{order_direction}" in content + assert "{limit}" in content From bd4ef2ff341bdc2a460d657270e62d22b7b3417b Mon Sep 17 00:00:00 2001 From: Alexandros Ladas Date: Sun, 14 Dec 2025 23:54:06 +0100 Subject: [PATCH 2/2] Fix SQL syntax by adding missing semicolons in various scripts and update logging messages for consistency in data preparation module. --- integration/snowflake/data/preparation.py | 5 ++--- integration/snowflake/data/sql/bootstrap/create_database.sql | 2 +- .../snowflake/data/sql/bootstrap/create_warehouse.sql | 2 +- integration/snowflake/data/sql/common/create_schema.sql | 2 +- .../data/sql/ingestion/create_parquet_file_format.sql | 2 +- .../snowflake/data/sql/ingestion/create_stage_s3_public.sql | 2 +- .../snowflake/data/sql/preparation/analyze_stores.sql | 2 +- .../snowflake/data/sql/preparation/calculate_target.sql | 2 +- .../snowflake/data/sql/preparation/data_quality_check.sql | 2 +- .../snowflake/data/sql/preparation/drop_weekly_stores.sql | 2 +- .../snowflake/data/sql/preparation/overall_statistics.sql | 2 +- .../snowflake/data/sql/preparation/snapshots_by_store.sql | 2 +- .../snowflake/data/sql/preparation/store_statistics.sql | 2 +- .../snowflake/data/sql/preparation/weekly_stores_summary.sql | 2 +- .../data/sql/preparation/zero_sales_investigation.sql | 2 +- 15 files changed, 16 insertions(+), 17 deletions(-) diff --git a/integration/snowflake/data/preparation.py b/integration/snowflake/data/preparation.py index ad02cd2..d094aa0 100644 --- a/integration/snowflake/data/preparation.py +++ b/integration/snowflake/data/preparation.py @@ -100,7 +100,7 @@ def create_weekly_sales_by_store_with_target( logger.info(""" ================================================================================ -PREPARING WEEKLY SALES FORECASTING DATA BY STORE FOR GETML +PREPARING WEEKLY SALES FORECASTING DATA BY STORE FOR getML ================================================================================ """) @@ -123,10 +123,9 @@ def create_weekly_sales_by_store_with_target( Objects created in {target_schema} schema: - weekly_stores (TABLE) - - {table_name} (VIEW - USE THIS FOR GETML) + - {table_name} (VIEW - USE THIS FOR getML) Population table: {qualified_table_name} -For getML integration instructions, see: docs/GETML_WEEKLY_SALES_DATA_PREPARATION.md """) return qualified_table_name diff --git a/integration/snowflake/data/sql/bootstrap/create_database.sql b/integration/snowflake/data/sql/bootstrap/create_database.sql index b58ebb1..c440b31 100644 --- a/integration/snowflake/data/sql/bootstrap/create_database.sql +++ b/integration/snowflake/data/sql/bootstrap/create_database.sql @@ -1 +1 @@ -CREATE DATABASE IF NOT EXISTS {database_name} +CREATE DATABASE IF NOT EXISTS {database_name}; diff --git a/integration/snowflake/data/sql/bootstrap/create_warehouse.sql b/integration/snowflake/data/sql/bootstrap/create_warehouse.sql index 6e8ef05..38f4549 100644 --- a/integration/snowflake/data/sql/bootstrap/create_warehouse.sql +++ b/integration/snowflake/data/sql/bootstrap/create_warehouse.sql @@ -3,4 +3,4 @@ WITH WAREHOUSE_SIZE = '{warehouse_size}' AUTO_SUSPEND = {auto_suspend_seconds} AUTO_RESUME = TRUE - INITIALLY_SUSPENDED = TRUE + INITIALLY_SUSPENDED = TRUE; diff --git a/integration/snowflake/data/sql/common/create_schema.sql b/integration/snowflake/data/sql/common/create_schema.sql index 57faa76..bd0b8df 100644 --- a/integration/snowflake/data/sql/common/create_schema.sql +++ b/integration/snowflake/data/sql/common/create_schema.sql @@ -1 +1 @@ -CREATE SCHEMA IF NOT EXISTS {schema_name} +CREATE SCHEMA IF NOT EXISTS {schema_name}; diff --git a/integration/snowflake/data/sql/ingestion/create_parquet_file_format.sql b/integration/snowflake/data/sql/ingestion/create_parquet_file_format.sql index c6d6504..8e8b795 100644 --- a/integration/snowflake/data/sql/ingestion/create_parquet_file_format.sql +++ b/integration/snowflake/data/sql/ingestion/create_parquet_file_format.sql @@ -2,4 +2,4 @@ -- Uses CREATE OR REPLACE for idempotent execution CREATE OR REPLACE FILE FORMAT {schema_name}.PARQUET_FORMAT TYPE = PARQUET - USE_VECTORIZED_SCANNER = TRUE + USE_VECTORIZED_SCANNER = TRUE; diff --git a/integration/snowflake/data/sql/ingestion/create_stage_s3_public.sql b/integration/snowflake/data/sql/ingestion/create_stage_s3_public.sql index b9f778e..6cbb126 100644 --- a/integration/snowflake/data/sql/ingestion/create_stage_s3_public.sql +++ b/integration/snowflake/data/sql/ingestion/create_stage_s3_public.sql @@ -2,4 +2,4 @@ -- No credentials required for public buckets CREATE OR REPLACE STAGE {schema_name}.{stage_name} URL = '{bucket_url}' - FILE_FORMAT = {schema_name}.PARQUET_FORMAT + FILE_FORMAT = {schema_name}.PARQUET_FORMAT; diff --git a/integration/snowflake/data/sql/preparation/analyze_stores.sql b/integration/snowflake/data/sql/preparation/analyze_stores.sql index 03ab648..ce6aebf 100644 --- a/integration/snowflake/data/sql/preparation/analyze_stores.sql +++ b/integration/snowflake/data/sql/preparation/analyze_stores.sql @@ -10,4 +10,4 @@ SELECT FROM {source_schema}.raw_stores s LEFT JOIN {source_schema}.raw_orders o ON o.store_id = s.id GROUP BY s.id, s.name, s.opened_at -ORDER BY TRY_TO_TIMESTAMP(s.opened_at) +ORDER BY TRY_TO_TIMESTAMP(s.opened_at); diff --git a/integration/snowflake/data/sql/preparation/calculate_target.sql b/integration/snowflake/data/sql/preparation/calculate_target.sql index b7ac3b5..45fc628 100644 --- a/integration/snowflake/data/sql/preparation/calculate_target.sql +++ b/integration/snowflake/data/sql/preparation/calculate_target.sql @@ -36,4 +36,4 @@ SELECT FROM {target_schema}.weekly_stores ws LEFT JOIN weekly_order_totals wot ON wot.store_id = ws.store_id - AND wot.week_start = ws.reference_date + AND wot.week_start = ws.reference_date; diff --git a/integration/snowflake/data/sql/preparation/data_quality_check.sql b/integration/snowflake/data/sql/preparation/data_quality_check.sql index bce0155..f1d62d8 100644 --- a/integration/snowflake/data/sql/preparation/data_quality_check.sql +++ b/integration/snowflake/data/sql/preparation/data_quality_check.sql @@ -4,4 +4,4 @@ SELECT SUM(CASE WHEN next_week_sales = 0 THEN 1 ELSE 0 END) as zero_sales, SUM(CASE WHEN next_week_orders = 0 THEN 1 ELSE 0 END) as zero_orders, SUM(CASE WHEN next_week_sales > 0 AND next_week_orders = 0 THEN 1 ELSE 0 END) as invalid -FROM {target_schema}.{table_name} +FROM {target_schema}.{table_name}; diff --git a/integration/snowflake/data/sql/preparation/drop_weekly_stores.sql b/integration/snowflake/data/sql/preparation/drop_weekly_stores.sql index d707af2..da713c1 100644 --- a/integration/snowflake/data/sql/preparation/drop_weekly_stores.sql +++ b/integration/snowflake/data/sql/preparation/drop_weekly_stores.sql @@ -1,2 +1,2 @@ -- Drop the weekly_stores table if it exists -DROP TABLE IF EXISTS {target_schema}.weekly_stores +DROP TABLE IF EXISTS {target_schema}.weekly_stores; diff --git a/integration/snowflake/data/sql/preparation/overall_statistics.sql b/integration/snowflake/data/sql/preparation/overall_statistics.sql index 0772c2d..256572e 100644 --- a/integration/snowflake/data/sql/preparation/overall_statistics.sql +++ b/integration/snowflake/data/sql/preparation/overall_statistics.sql @@ -5,4 +5,4 @@ SELECT AVG(next_week_sales) as avg_weekly_sales, SUM(next_week_sales) as total_sales, SUM(next_week_orders) as total_orders -FROM {target_schema}.{table_name} +FROM {target_schema}.{table_name}; diff --git a/integration/snowflake/data/sql/preparation/snapshots_by_store.sql b/integration/snowflake/data/sql/preparation/snapshots_by_store.sql index 03b24d7..5e3adf5 100644 --- a/integration/snowflake/data/sql/preparation/snapshots_by_store.sql +++ b/integration/snowflake/data/sql/preparation/snapshots_by_store.sql @@ -8,4 +8,4 @@ SELECT FROM {target_schema}.{table_name} WHERE store_name = '{store_name}' ORDER BY reference_date {order_direction} -LIMIT {limit} +LIMIT {limit}; diff --git a/integration/snowflake/data/sql/preparation/store_statistics.sql b/integration/snowflake/data/sql/preparation/store_statistics.sql index 6bdeb4e..d5f9bb7 100644 --- a/integration/snowflake/data/sql/preparation/store_statistics.sql +++ b/integration/snowflake/data/sql/preparation/store_statistics.sql @@ -9,4 +9,4 @@ SELECT SUM(next_week_orders) as total_orders FROM {target_schema}.{table_name} GROUP BY store_name -ORDER BY avg_weekly_sales DESC +ORDER BY avg_weekly_sales DESC; diff --git a/integration/snowflake/data/sql/preparation/weekly_stores_summary.sql b/integration/snowflake/data/sql/preparation/weekly_stores_summary.sql index 9cd38b5..5dac158 100644 --- a/integration/snowflake/data/sql/preparation/weekly_stores_summary.sql +++ b/integration/snowflake/data/sql/preparation/weekly_stores_summary.sql @@ -24,4 +24,4 @@ SELECT p.last_snapshot FROM totals t CROSS JOIN per_store p -ORDER BY p.first_snapshot +ORDER BY p.first_snapshot; diff --git a/integration/snowflake/data/sql/preparation/zero_sales_investigation.sql b/integration/snowflake/data/sql/preparation/zero_sales_investigation.sql index 7a03bb7..68c0fa4 100644 --- a/integration/snowflake/data/sql/preparation/zero_sales_investigation.sql +++ b/integration/snowflake/data/sql/preparation/zero_sales_investigation.sql @@ -3,4 +3,4 @@ SELECT store_name, COUNT(*) as count FROM {target_schema}.{table_name} WHERE next_week_sales = 0 GROUP BY store_name -ORDER BY count DESC +ORDER BY count DESC;