diff --git a/docs/how_tos/map_time_config.md b/docs/how_tos/map_time_config.md index 1a3aa26..c45f823 100644 --- a/docs/how_tos/map_time_config.md +++ b/docs/how_tos/map_time_config.md @@ -35,7 +35,7 @@ dst_table_name = "ev_charging_datetime" hours_per_year = 12 * 7 * 24 num_time_arrays = 3 df = pd.DataFrame({ - "id": np.concat([np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)]), + "id": np.concatenate([np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)]), "month": np.tile(np.repeat(range(1, 13), 7 * 24), num_time_arrays), "day_of_week": np.tile(np.tile(np.repeat(range(7), 24), 12), num_time_arrays), "hour": np.tile(np.tile(range(24), 12 * 7), num_time_arrays), diff --git a/src/chronify/__init__.py b/src/chronify/__init__.py index a61ca6c..8a0c1a9 100644 --- a/src/chronify/__init__.py +++ b/src/chronify/__init__.py @@ -20,13 +20,12 @@ TableSchema, ) from chronify.store import Store -from chronify.time import RepresentativePeriodFormat +from chronify.time import RepresentativePeriodFormat, TimeDataType from chronify.time_configs import ( AnnualTimeRange, DatetimeRange, DatetimeRangeWithTZColumn, - IndexTimeRangeNTZ, - IndexTimeRangeTZ, + IndexTimeRange, IndexTimeRangeWithTZColumn, RepresentativePeriodTimeNTZ, RepresentativePeriodTimeTZ, @@ -42,9 +41,8 @@ "CsvTableSchema", "DatetimeRange", "DatetimeRangeWithTZColumn", + "IndexTimeRange", "IndexTimeRangeWithTZColumn", - "IndexTimeRangeNTZ", - "IndexTimeRangeTZ", "InvalidOperation", "InvalidParameter", "InvalidTable", @@ -59,6 +57,7 @@ "TableSchema", "TimeBaseModel", "TimeBasedDataAdjustment", + "TimeDataType", ) __version__ = metadata.metadata("chronify")["Version"] diff --git a/src/chronify/datetime_range_generator.py b/src/chronify/datetime_range_generator.py index 87574f4..5c036a6 100644 --- a/src/chronify/datetime_range_generator.py +++ b/src/chronify/datetime_range_generator.py @@ -1,15 +1,16 @@ from datetime import datetime, tzinfo from typing import Generator, Optional -from zoneinfo import ZoneInfo from itertools import chain +from calendar import isleap import pandas as pd from chronify.time import ( LeapDayAdjustmentType, + TimeDataType, ) from chronify.time_configs import DatetimeRanges, DatetimeRange, DatetimeRangeWithTZColumn -from chronify.time_utils import adjust_timestamp_by_dst_offset, get_tzname +from chronify.time_utils import get_tzname from chronify.time_range_generator_base import TimeRangeGeneratorBase from chronify.exceptions import InvalidValue @@ -25,58 +26,59 @@ def __init__( self._model = model self._adjustment = leap_day_adjustment or LeapDayAdjustmentType.NONE - def _iter_timestamps( - self, start: Optional[datetime] = None - ) -> Generator[datetime, None, None]: - """ + def _list_timestamps(self, start: Optional[datetime] = None) -> list[datetime]: + """Return all timestamps as a list. if start is supplied, override self._model.start """ if start is None: start = self._model.start - tz = start.tzinfo - - for i in range(self._model.length): - if not tz: - cur = adjust_timestamp_by_dst_offset( - start + i * self._model.resolution, self._model.resolution - ) - else: - # always step in standard time - cur_utc = start.astimezone(ZoneInfo("UTC")) + i * self._model.resolution - cur = adjust_timestamp_by_dst_offset( - cur_utc.astimezone(tz), self._model.resolution - ) - - is_leap_year = ( - pd.Timestamp(f"{cur.year}-01-01") + pd.Timedelta(days=365) - ).year == cur.year - if not is_leap_year: - yield pd.Timestamp(cur) - continue - - month = cur.month - day = cur.day - if not ( - self._adjustment == LeapDayAdjustmentType.DROP_FEB29 and month == 2 and day == 29 - ): - if not ( - self._adjustment == LeapDayAdjustmentType.DROP_DEC31 - and month == 12 - and day == 31 - ): - if not ( - self._adjustment == LeapDayAdjustmentType.DROP_JAN1 - and month == 1 - and day == 1 - ): - yield pd.Timestamp(cur) + + timestamps = pd.date_range( + start=start, + periods=self._model.length, + freq=self._model.resolution, + ).tolist() + + match self._adjustment: + case LeapDayAdjustmentType.DROP_FEB29: + timestamps = [ + ts + for ts in timestamps + if not (isleap(ts.year) and ts.month == 2 and ts.day == 29) + ] + case LeapDayAdjustmentType.DROP_DEC31: + timestamps = [ + ts + for ts in timestamps + if not (isleap(ts.year) and ts.month == 12 and ts.day == 31) + ] + case LeapDayAdjustmentType.DROP_JAN1: + timestamps = [ + ts + for ts in timestamps + if not (isleap(ts.year) and ts.month == 1 and ts.day == 1) + ] + case _: + pass + + return timestamps # type: ignore + + def _iter_timestamps( + self, start: Optional[datetime] = None + ) -> Generator[datetime, None, None]: + """Generator from pd.date_range(). + Note: Established time library already handles historical changes in time zone conversion to UTC. + (e.g. Algeria (Africa/Algiers) changed from UTC+0 to UTC+1 on April 25, 1980) + """ + for ts in self._list_timestamps(start=start): + yield ts def list_time_columns(self) -> list[str]: return self._model.list_time_columns() def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[datetime]: result = sorted(df[self._model.time_column].unique()) - if not isinstance(result[0], datetime): + if len(result) > 0 and not isinstance(result[0], datetime): result = [pd.Timestamp(x) for x in result] return result @@ -93,7 +95,7 @@ def __init__( assert isinstance(self._model, DatetimeRange) def list_timestamps(self) -> list[datetime]: - return list(self._iter_timestamps()) + return self._list_timestamps() # list(self._iter_timestamps()) class DatetimeRangeGeneratorExternalTimeZone(DatetimeRangeGeneratorBase): @@ -117,42 +119,62 @@ def __init__( ) raise InvalidValue(msg) - def _list_timestamps(self, time_zone: Optional[tzinfo]) -> list[datetime]: - """always return tz-naive timestamps relative to input time_zone""" - if self._model.start_time_is_tz_naive(): - # For clock-time-aligned data, iterate naively without timezone conversion. - # All timezones get the same clock times (e.g., midnight everywhere). - start = None - else: - if time_zone: - start = self._model.start.astimezone(time_zone) - else: - start = self._model.start.replace(tzinfo=None) - timestamps = list(self._iter_timestamps(start=start)) - return [x.replace(tzinfo=None) for x in timestamps] + def _list_timestamps_by_time_zone(self, time_zone: Optional[tzinfo]) -> list[datetime]: + """return timestamps for a given time_zone expected in the dataframe + returned timestamp dtype matches that in the dataframe, i.e. self._model.dtype + (e.g., if time_zone is None, return tz-naive timestamps else return tz-aware timestamps) + """ + match (self._model.start_time_is_tz_naive(), self._model.dtype): + case (True, TimeDataType.TIMESTAMP_NTZ): + # aligned_in_local_standard_time of the time zone, + # all time zones must have the same tz-naive timestamps + # timestamps must represent local standard time zone, not local prevailing time zone with DST + start = self._model.start + case (True, TimeDataType.TIMESTAMP_TZ): + # aligned_in_local_standard_time of the time zone, + # all time zones have different tz-aware timestamps that are aligned when adjusted to local standard time zone + start = self._model.start.replace(tzinfo=time_zone) + case (False, TimeDataType.TIMESTAMP_NTZ): + # aligned_in_absolute_time, + # all time zones have different tz-naive timestamps that are aligned when localized to the time zone + if time_zone: + start = self._model.start.astimezone(time_zone).replace(tzinfo=None) + else: + start = self._model.start.replace(tzinfo=None) + case (False, TimeDataType.TIMESTAMP_TZ): + # aligned_in_absolute_time, all time zones have the same tz-aware timestamps + start = self._model.start + case _: + msg = f"Unsupported combination of start_time_is_tz_naive and dtype: {self._model}" + raise InvalidValue(msg) + return self._list_timestamps(start=start) # ist(self._iter_timestamps(start=start)) def list_timestamps(self) -> list[datetime]: - """return ordered timestamps across all time zones in the order of the time zones.""" + """return ordered tz-naive timestamps across all time zones in the order of the time zones.""" dct = self.list_timestamps_by_time_zone() return list(chain(*dct.values())) def list_timestamps_by_time_zone(self) -> dict[str, list[datetime]]: - """for each time zone, returns full timestamp iteration (duplicates allowed)""" + """for each time zone, returns full timestamp iteration + (duplicates allowed)""" dct = {} for tz in self._model.get_time_zones(): tz_name = get_tzname(tz) - dct[tz_name] = self._list_timestamps(tz) - + dct[tz_name] = self._list_timestamps_by_time_zone(time_zone=tz) return dct def list_distinct_timestamps_by_time_zone_from_dataframe( self, df: pd.DataFrame ) -> dict[str, list[datetime]]: + """ + from the dataframe, for each time zone, returns distinct timestamps + """ tz_col = self._model.get_time_zone_column() t_col = self._model.time_column df[t_col] = pd.to_datetime(df[t_col]) df2 = df[[tz_col, t_col]].drop_duplicates() dct = {} for tz_name in sorted(df2[tz_col].unique()): - dct[tz_name] = sorted(df2.loc[df2[tz_col] == tz_name, t_col].tolist()) + timestamps = sorted(df2.loc[df2[tz_col] == tz_name, t_col].tolist()) + dct[tz_name] = timestamps return dct diff --git a/src/chronify/sqlalchemy/functions.py b/src/chronify/sqlalchemy/functions.py index 6539ec8..1e882f0 100644 --- a/src/chronify/sqlalchemy/functions.py +++ b/src/chronify/sqlalchemy/functions.py @@ -13,10 +13,11 @@ import pandas as pd from numpy.dtypes import DateTime64DType, ObjectDType from pandas import DatetimeTZDtype +from chronify.time import TimeDataType from sqlalchemy import Connection, Engine, Selectable, text from chronify.exceptions import InvalidOperation, InvalidParameter -from chronify.time_configs import DatetimeRangeBase, DatetimeRange, TimeBaseModel +from chronify.time_configs import DatetimeRangeBase, TimeBaseModel from chronify.utils.path_utils import check_overwrite, delete_if_exists, to_path # Copied from Pandas/Polars @@ -35,7 +36,7 @@ def read_database( df = conn.execute(query).cursor.fetch_df() # type: ignore case "sqlite": df = pd.read_sql(query, conn, params=params) - if isinstance(config, DatetimeRange): + if isinstance(config, DatetimeRangeBase): _convert_database_output_for_datetime(df, config) case "hive": df = _read_from_hive(query, conn, config, params) @@ -81,9 +82,9 @@ def _check_one_config_per_datetime_column(configs: Sequence[TimeBaseModel]) -> N def _convert_database_input_for_datetime( - df: pd.DataFrame, config: DatetimeRange, copied: bool + df: pd.DataFrame, config: DatetimeRangeBase, copied: bool ) -> tuple[pd.DataFrame, bool]: - if config.start_time_is_tz_naive(): + if config.dtype == TimeDataType.TIMESTAMP_NTZ: return df, copied if copied: @@ -91,7 +92,6 @@ def _convert_database_input_for_datetime( else: df2 = df.copy() copied = True - if isinstance(df2[config.time_column].dtype, DatetimeTZDtype): df2[config.time_column] = df2[config.time_column].dt.tz_convert("UTC") else: @@ -100,9 +100,9 @@ def _convert_database_input_for_datetime( return df2, copied -def _convert_database_output_for_datetime(df: pd.DataFrame, config: DatetimeRange) -> None: +def _convert_database_output_for_datetime(df: pd.DataFrame, config: DatetimeRangeBase) -> None: if config.time_column in df.columns: - if not config.start_time_is_tz_naive(): + if config.dtype == TimeDataType.TIMESTAMP_TZ: if isinstance(df[config.time_column].dtype, ObjectDType): df[config.time_column] = pd.to_datetime(df[config.time_column], utc=True) else: @@ -120,6 +120,7 @@ def _write_to_duckdb( ) -> None: assert conn._dbapi_connection is not None assert conn._dbapi_connection.driver_connection is not None + match if_table_exists: case "append": query = f"INSERT INTO {table_name} SELECT * FROM df" @@ -131,6 +132,7 @@ def _write_to_duckdb( case _: msg = f"{if_table_exists=}" raise InvalidOperation(msg) + conn._dbapi_connection.driver_connection.sql(query) @@ -190,7 +192,7 @@ def _read_from_hive( ) -> pd.DataFrame: df = pd.read_sql_query(query, conn, params=params) if ( - isinstance(config, DatetimeRange) + isinstance(config, DatetimeRangeBase) and config.time_column in df.columns and not config.start_time_is_tz_naive() ): @@ -210,7 +212,7 @@ def _write_to_sqlite( _check_one_config_per_datetime_column(configs) copied = False for config in configs: - if isinstance(config, DatetimeRange): + if isinstance(config, DatetimeRangeBase): df, copied = _convert_database_input_for_datetime(df, config, copied) df.to_sql(table_name, conn, if_exists=if_table_exists, index=False) diff --git a/src/chronify/store.py b/src/chronify/store.py index dec92f6..407b7a6 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -51,6 +51,7 @@ from chronify.time_series_checker import check_timestamps from chronify.time_series_mapper import map_time from chronify.time_zone_converter import TimeZoneConverter, TimeZoneConverterByColumn +from chronify.time_zone_localizer import TimeZoneLocalizer, TimeZoneLocalizerByColumn from chronify.utils.path_utils import check_overwrite, to_path from chronify.utils.sqlalchemy_view import create_view @@ -892,7 +893,7 @@ def map_table_time_config( >>> num_time_arrays = 3 >>> df = pd.DataFrame( ... { - ... "id": np.concat( + ... "id": np.concatenate( ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] ... ), ... "month": np.tile(np.repeat(range(1, 13), 7 * 24), num_time_arrays), @@ -983,7 +984,7 @@ def convert_time_zone( >>> num_time_arrays = 1 >>> df = pd.DataFrame( ... { - ... "id": np.concat( + ... "id": np.concatenate( ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] ... ), ... "timestamp": np.tile( @@ -1072,7 +1073,7 @@ def convert_time_zone_by_column( >>> num_time_arrays = 3 >>> df = pd.DataFrame( ... { - ... "id": np.concat( + ... "id": np.concatenate( ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] ... ), ... "timestamp": np.tile( @@ -1124,6 +1125,189 @@ def convert_time_zone_by_column( return dst_schema + def localize_time_zone( + self, + src_name: str, + time_zone: tzinfo | None, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> TableSchema: + """ + Localize the time zone of the existing table represented by src_name to a specified time zone + + Parameters + ---------- + src_name + Refers to the table name of the source data. + time_zone + Standard time zone to localize to. If None, keep as tz-naive. + scratch_dir + Directory to use for temporary writes. Default to the system's tmp filesystem. + output_file + If set, write the mapped table to this Parquet file. + check_mapped_timestamps + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Raises + ------ + TableAlreadyExists + Raised if the dst_schema name already exists. + + Returns + ------- + TableSchema + The schema of the newly created table. + + Examples + -------- + >>> store = Store() + >>> start = datetime(year=2018, month=1, day=1) # tz-naive + >>> freq = timedelta(hours=1) + >>> hours_per_year = 8760 + >>> num_time_arrays = 1 + >>> df = pd.DataFrame( + ... { + ... "id": np.concatenate( + ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] + ... ), + ... "timestamp": np.tile( + ... pd.date_range(start, periods=hours_per_year, freq="h"), num_time_arrays + ... ), + ... "value": np.random.random(hours_per_year * num_time_arrays), + ... } + ... ) + >>> schema = TableSchema( + ... name="some_data", + ... time_config=DatetimeRange( + ... time_column="timestamp", + ... start=start, + ... length=hours_per_year, + ... resolution=freq, + ... ), + ... time_array_id_columns=["id"], + ... value_column="value", + ... ) + >>> store.ingest_table(df, schema) + >>> to_time_zone = ZoneInfo("EST") + >>> dst_schema = store.localize_time_zone( + ... schema.name, to_time_zone, check_mapped_timestamps=True + ... ) + """ + + src_schema = self._schema_mgr.get_schema(src_name) + tzl = TimeZoneLocalizer(self._engine, self._metadata, src_schema, time_zone) + + dst_schema = tzl.generate_to_schema() + if self.has_table(dst_schema.name): + msg = dst_schema.name + raise TableAlreadyExists(msg) + + tzl.localize_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + with self._engine.begin() as conn: + self._schema_mgr.add_schema(conn, dst_schema) + + return dst_schema + + def localize_time_zone_by_column( + self, + src_name: str, + time_zone_column: Optional[str] = None, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> TableSchema: + """ + Localize the time zone of the existing table represented by src_name to time zones defined by a column + + Parameters + ---------- + src_name + Refers to the table name of the source data. + time_zone_column + Name of the time zone column for localization, default to None + scratch_dir + Directory to use for temporary writes. Default to the system's tmp filesystem. + output_file + If set, write the mapped table to this Parquet file. + check_mapped_timestamps + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Raises + ------ + TableAlreadyExists + Raised if the dst_schema name already exists. + + Returns + ------- + TableSchema + The schema of the newly created table. + + Examples + -------- + >>> store = Store() + >>> start = datetime(year=2018, month=1, day=1) # tz-naive + >>> freq = timedelta(hours=1) + >>> hours_per_year = 8760 + >>> num_time_arrays = 3 + >>> df = pd.DataFrame( + ... { + ... "id": np.concatenate( + ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] + ... ), + ... "timestamp": np.tile( + ... pd.date_range(start, periods=hours_per_year, freq="h"), num_time_arrays + ... ), + ... "time_zone": np.repeat(["EST", "CST", "MST"], hours_per_year), + ... "value": np.random.random(hours_per_year * num_time_arrays), + ... } + ... ) + >>> schema = TableSchema( + ... name="some_data", + ... time_config=DatetimeRange( + ... time_column="timestamp", + ... start=start, + ... length=hours_per_year, + ... resolution=freq, + ... ), + ... time_array_id_columns=["id"], + ... value_column="value", + ... ) + >>> store.ingest_table(df, schema) + >>> time_zone_column = "time_zone" + >>> dst_schema = store.localize_time_zone_by_column( + ... schema.name, + ... time_zone_column, + ... check_mapped_timestamps=True, + ... ) + """ + + src_schema = self._schema_mgr.get_schema(src_name) + tzl = TimeZoneLocalizerByColumn(self._engine, self._metadata, src_schema, time_zone_column) + + dst_schema = tzl.generate_to_schema() + if self.has_table(dst_schema.name): + msg = dst_schema.name + raise TableAlreadyExists(msg) + + tzl.localize_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + with self._engine.begin() as conn: + self._schema_mgr.add_schema(conn, dst_schema) + + return dst_schema + def read_query( self, name: str, diff --git a/src/chronify/time.py b/src/chronify/time.py index 08651ef..26d273f 100644 --- a/src/chronify/time.py +++ b/src/chronify/time.py @@ -8,13 +8,12 @@ class TimeType(StrEnum): - """Defines the supported time formats in the load data.""" + """Defines the formats of time config / representation.""" DATETIME = "datetime" DATETIME_TZ_COL = "datetime_tz_col" ANNUAL = "annual" - INDEX_NTZ = "index_ntz" - INDEX_TZ = "index_tz" + INDEX = "index" INDEX_TZ_COL = "index_tz_col" REPRESENTATIVE_PERIOD_NTZ = "representative_period_ntz" REPRESENTATIVE_PERIOD_TZ = "representative_period_tz" @@ -23,12 +22,13 @@ class TimeType(StrEnum): YEAR_MONTH_DAY_PERIOD_NTZ = "year_month_day_period" -class DatetimeFormat(StrEnum): - """Defines the time format of the datetime config model""" +class TimeDataType(StrEnum): + """Defines the data types of datetime columns in load_data.""" - ALIGNED = "aligned" - LOCAL = "local" - LOCAL_AS_STRINGS = "local_as_strings" + TIMESTAMP_TZ = "timestamp_tz" + TIMESTAMP_NTZ = "timestamp_ntz" + STRING = "string" + TIMESTAMP_IN_PARTS = "timestamp_in_parts" class RepresentativePeriodFormat(StrEnum): diff --git a/src/chronify/time_configs.py b/src/chronify/time_configs.py index e14709c..2cb9f49 100644 --- a/src/chronify/time_configs.py +++ b/src/chronify/time_configs.py @@ -3,7 +3,7 @@ from zoneinfo import ZoneInfo from datetime import datetime, timedelta, tzinfo from typing import Union, Literal, Optional -from pydantic import Field, field_validator +from pydantic import Field, field_validator, ValidationInfo from typing_extensions import Annotated from chronify.base_models import ChronifyBaseModel @@ -11,6 +11,7 @@ DaylightSavingAdjustmentType, LeapDayAdjustmentType, MeasurementType, + TimeDataType, TimeIntervalType, TimeType, RepresentativePeriodFormat, @@ -88,6 +89,10 @@ class DatetimeRange(DatetimeRangeBase): description="Start time of the range. If it includes a time zone, the timestamps in " "the data must be time zone-aware." ) + dtype: Optional[Literal[TimeDataType.TIMESTAMP_TZ, TimeDataType.TIMESTAMP_NTZ]] = Field( + description="Data type of the timestamps in the time column.", + default=None, + ) def get_time_zone_column(self) -> None: return None @@ -95,6 +100,34 @@ def get_time_zone_column(self) -> None: def get_time_zones(self) -> list[tzinfo | None]: return [] + @field_validator("dtype", mode="after") + @classmethod + def check_dtype_start_time_consistency( + cls, + dtype: Optional[Literal[TimeDataType.TIMESTAMP_TZ, TimeDataType.TIMESTAMP_NTZ]], + info: ValidationInfo, + ) -> Literal[TimeDataType.TIMESTAMP_TZ, TimeDataType.TIMESTAMP_NTZ]: + match (info.data["start"].tzinfo is None, dtype): + # assign default dtype if not provided + case (True, None): + dtype = TimeDataType.TIMESTAMP_NTZ + case (False, None): + dtype = TimeDataType.TIMESTAMP_TZ + # validate dype if provided + case (True, TimeDataType.TIMESTAMP_TZ): + msg = ( + "DatetimeRange with tz-naive start time must have dtype TIMESTAMP_NTZ: " + f"\n{info.data['start']=}, {dtype=}" + ) + raise InvalidValue(msg) + case (False, TimeDataType.TIMESTAMP_NTZ): + msg = ( + "DatetimeRange with tz-aware start time must have dtype TIMESTAMP_TZ: " + f"\n{info.data['start']=}, {dtype=}" + ) + raise InvalidValue(msg) + return dtype + class DatetimeRangeWithTZColumn(DatetimeRangeBase): """Defines a time range that uses an external time zone column to interpret timestamps.""" @@ -113,6 +146,9 @@ class DatetimeRangeWithTZColumn(DatetimeRangeBase): time_zones: list[tzinfo | ZoneInfo | None] = Field( description="Unique time zones from the table." ) + dtype: Literal[TimeDataType.TIMESTAMP_TZ, TimeDataType.TIMESTAMP_NTZ] = Field( + description="Data type of the timestamps in the time column." + ) def get_time_zone_column(self) -> str: return self.time_zone_column @@ -161,7 +197,7 @@ class IndexTimeRangeBase(TimeBaseModel): start: int = Field(description="starting index") length: int start_timestamp: datetime = Field( - description="The timestamp represented by the starting index." + description="The timestamp represented by the starting index. Can be tz-aware or tz-naive." ) resolution: timedelta = Field(description="The resolution of time represented by the indexes.") time_type: TimeType @@ -174,44 +210,12 @@ def list_time_columns(self) -> list[str]: return [self.time_column] -class IndexTimeRangeNTZ(IndexTimeRangeBase): - """Index time that represents tz-naive timestamps. - start_timestamp is tz-naive - """ - - time_type: Literal[TimeType.INDEX_NTZ] = TimeType.INDEX_NTZ - - @field_validator("start_timestamp") - @classmethod - def check_start_timestamp(cls, start_timestamp: datetime) -> datetime: - if start_timestamp.tzinfo is not None: - msg = "start_timestamp must be tz-naive for IndexTimeRangeNTZ" - raise InvalidValue(msg) - return start_timestamp - - def get_time_zone_column(self) -> None: - return None - - def get_time_zones(self) -> list[tzinfo | None]: - return [] - - -class IndexTimeRangeTZ(IndexTimeRangeBase): - """Index time that represents tz-aware timestamps of a single time zone. - start_timestamp is tz-aware. - Used for dataset where the timeseries for all geographies start at the same - absolute time. +class IndexTimeRange(IndexTimeRangeBase): + """Index time that represents timestamps. + start_timestamp can be tz-aware or tz-naive """ - time_type: Literal[TimeType.INDEX_TZ] = TimeType.INDEX_TZ - - @field_validator("start_timestamp") - @classmethod - def check_start_timestamp(cls, start_timestamp: datetime) -> datetime: - if start_timestamp.tzinfo is None: - msg = "start_timestamp must be tz-aware for IndexTimeRangeTZ" - raise InvalidValue(msg) - return start_timestamp + time_type: Literal[TimeType.INDEX] = TimeType.INDEX def get_time_zone_column(self) -> None: return None @@ -248,8 +252,7 @@ def get_time_zones(self) -> list[tzinfo | None]: IndexTimeRanges = Union[ - IndexTimeRangeNTZ, - IndexTimeRangeTZ, + IndexTimeRange, IndexTimeRangeWithTZColumn, ] @@ -431,8 +434,7 @@ def default_config(cls, length: int, year: int) -> "MonthDayHourTimeNTZ": AnnualTimeRange, DatetimeRange, DatetimeRangeWithTZColumn, - IndexTimeRangeNTZ, - IndexTimeRangeTZ, + IndexTimeRange, IndexTimeRangeWithTZColumn, RepresentativePeriodTimeNTZ, RepresentativePeriodTimeTZ, diff --git a/src/chronify/time_series_checker.py b/src/chronify/time_series_checker.py index 17c94c6..afaef59 100644 --- a/src/chronify/time_series_checker.py +++ b/src/chronify/time_series_checker.py @@ -27,7 +27,10 @@ def check_timestamps( class TimeSeriesChecker: - """Performs checks on time series arrays in a table.""" + """Performs checks on time series arrays in a table. + Timestamps in the table will be checked against expected timestamps generated from the + TableSchema's time_config. TZ-awareness of the generated timestamps will match that of thetable. + """ def __init__( self, @@ -86,7 +89,6 @@ def _check_expected_timestamps_with_external_time_zone(self) -> int: stmt = stmt.where(self._table.c[col].is_not(None)) df = read_database(stmt, self._conn, self._schema.time_config) actual_dct = self._time_generator.list_distinct_timestamps_by_time_zone_from_dataframe(df) - if sorted(expected_dct.keys()) != sorted(actual_dct.keys()): msg = ( "Time zone records do not match between expected and actual from table " diff --git a/src/chronify/time_series_mapper_index_time.py b/src/chronify/time_series_mapper_index_time.py index 34c4094..4ef3925 100644 --- a/src/chronify/time_series_mapper_index_time.py +++ b/src/chronify/time_series_mapper_index_time.py @@ -19,7 +19,7 @@ ) from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_series_mapper_datetime import MapperDatetimeToDatetime -from chronify.time import TimeType, DaylightSavingAdjustmentType, AggregationType +from chronify.time import TimeDataType, TimeType, DaylightSavingAdjustmentType, AggregationType from chronify.sqlalchemy.functions import read_database logger = logging.getLogger(__name__) @@ -35,6 +35,7 @@ def __init__( data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, ) -> None: + # TODO: refactor to use new time configs super().__init__( engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed ) @@ -181,7 +182,7 @@ def _create_local_time_config(self, time_zone: str) -> DatetimeRange: return time_config def _create_interm_map(self) -> tuple[pd.DataFrame, MappingTableSchema, TableSchema]: - """Create mapping dataframe for converting INDEX_TZ or INDEX_NTZ time to its represented datetime""" + """Create mapping dataframe for converting INDEX time to its represented datetime""" mapped_schema = self._create_intermediate_schema() assert isinstance(mapped_schema.time_config, DatetimeRange) mapped_time_col = mapped_schema.time_config.time_column @@ -195,7 +196,6 @@ def _create_interm_map(self) -> tuple[pd.DataFrame, MappingTableSchema, TableSch name="mapping_table", time_configs=[from_time_config, mapped_schema.time_config], ) - df = pd.DataFrame( { from_time_col: from_time_data, @@ -251,6 +251,7 @@ def _create_interm_map_with_time_zone( # Update mapped_schema mapped_schema.time_config.start = df[mapped_time_col].min() mapped_schema.time_config.length = df[mapped_time_col].nunique() + mapped_schema.time_config.dtype = TimeDataType.TIMESTAMP_TZ mapping_schema = MappingTableSchema( name="mapping_table_index_time", @@ -316,6 +317,7 @@ def _create_interm_map_with_time_zone_and_dst_adjustment( # Update mapped_schema mapped_schema.time_config.start = df[mapped_time_col].min() mapped_schema.time_config.length = df[mapped_time_col].nunique() + mapped_schema.time_config.dtype = TimeDataType.TIMESTAMP_TZ mapping_schema = MappingTableSchema( name="mapping_table_index_time_with_dst_adjustment", diff --git a/src/chronify/time_zone_converter.py b/src/chronify/time_zone_converter.py index 115c112..a686e80 100644 --- a/src/chronify/time_zone_converter.py +++ b/src/chronify/time_zone_converter.py @@ -20,10 +20,11 @@ from chronify.time_series_mapper_base import apply_mapping from chronify.time_range_generator_factory import make_time_range_generator from chronify.sqlalchemy.functions import read_database -from chronify.time import TimeType +from chronify.time import TimeDataType, TimeType from chronify.time_utils import wrapped_time_timestamps, get_tzname +# TODO - retain original timestamp column def convert_time_zone( engine: Engine, metadata: MetaData, @@ -143,15 +144,16 @@ def __init__( def _check_from_schema(self, from_schema: TableSchema) -> None: msg = "" if not isinstance(from_schema.time_config, DatetimeRange): - msg += "Source schema does not have DatetimeRange time config. " - if ( - isinstance(from_schema.time_config, DatetimeRange) - and from_schema.time_config.start_time_is_tz_naive() - ): + msg += "Source schema must have DatetimeRange time config. " + if from_schema.time_config.dtype != TimeDataType.TIMESTAMP_TZ: + msg += "Source schema time config dtype must be Timestamp_TZ. " + if from_schema.time_config.start_time_is_tz_naive(): msg += ( - "Source schema start_time must be timezone-aware. " - "To convert from timezone-naive to timezone-aware, " - "use the TimeSeriesMapperDatetime.map_time() method instead. " + "Source schema time config start time must be timezone-aware. " + "This converter will convert time zones and return timestamps as tz-naive " + "along with time zone information in a column. " + "To localize timestamps from timezone-naive to timezone-aware, " + "use TimeZoneLocalizer() or TimeZoneLocalizerByColumn() instead. " ) if msg != "": raise InvalidParameter(msg) @@ -171,7 +173,17 @@ def convert_time_zone( class TimeZoneConverter(TimeZoneConverterBase): - """Class for time zone conversion of time series data to a specified time zone.""" + """Class for time zone conversion of tz-aware, aligned_in_absolute_time + time series data to a specified time zone. + + Input data table must contain tz-aware timestamps. + Input time config must be of type DatetimeRange with Timestamp_TZ dtype and tz-aware start time. + Output data table will contain tz-naive timestamps with time zone recorded in a column + Output time config will be of type DatetimeRange with Timestamp_NTZ dtype and tz-naive start time. + + # TODO: support DatetimeRangeWithTZColumn as input time config + # TODO: support wrap_time_allowed option + """ def __init__( self, @@ -186,20 +198,20 @@ def __init__( def generate_to_time_config(self) -> DatetimeRangeWithTZColumn: assert isinstance(self._from_schema.time_config, DatetimeRange) # mypy - to_time_config = self._from_schema.time_config.model_copy() - if self._to_time_zone: - to_time_config.start = to_time_config.start.astimezone(self._to_time_zone).replace( - tzinfo=None - ) - else: - to_time_config.start = to_time_config.start.replace(tzinfo=None) - time_kwargs = to_time_config.model_dump() + time_kwargs = self._from_schema.time_config.model_dump() time_kwargs = dict( filter( lambda k_v: k_v[0] in DatetimeRangeWithTZColumn.model_fields, time_kwargs.items(), ) ) + if self._to_time_zone: + time_kwargs["start"] = ( + time_kwargs["start"].astimezone(self._to_time_zone).replace(tzinfo=None) + ) + else: + time_kwargs["start"] = time_kwargs["start"].replace(tzinfo=None) + time_kwargs["dtype"] = TimeDataType.TIMESTAMP_NTZ time_kwargs["time_type"] = TimeType.DATETIME_TZ_COL time_kwargs["time_zone_column"] = "time_zone" time_kwargs["time_zones"] = [self._to_time_zone] @@ -274,7 +286,36 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: class TimeZoneConverterByColumn(TimeZoneConverterBase): - """Class for time zone conversion of time series data based on a time zone column.""" + """Class for time zone conversion of tz-aware, aligned_in_absolute_time + time series data based on a time zone column. + + Input data table must contain tz-aware timestamps and a time zone column. + Input time config must be of type DatetimeRangeWithTZColumn or DatetimeRange with Timestamp_TZ dtype. + - If DatetimeRange is used, time_zone_column must be provided. + - If DatetimeRangeWithTZColumn is used, it is converted to DatetimeRange internally. + time_zone_column, if provided, is ignored and instead taken from the time_config. + Output data table will contain tz-naive timestamps and the original time zone column. + Output time config will be of type DatetimeRangeWithTZColumn with Timestamp_NTZ dtype (see scenarios). + + I/O Time config scenarios: + -------------------------------- + To convert tz-aware timestamps aligned_in_absolute_time to multiple time zones specified in a column: + - wrap_time_allowed = False + - Input time config: DatetimeRange with tz-aware start time, Timestamp_TZ dtype + - Output time config: DatetimeRangeWithTZColumn with tz-aware start time, Timestamp_NTZ dtype + + To convert tz-aware timestamps aligned_in_absolute_time to multiple time zones specified in a column + and aligned_in_local_standard_time: + - wrap_time_allowed = True + - Input time config: DatetimeRange with tz-aware start time, Timestamp_TZ dtype + - Output time config: DatetimeRangeWithTZColumn with tz-naive start time, Timestamp_NTZ dtype + Note: converted time is wrapped within the local time range of the original timestamps. + -------------------------------- + + # TODO: support DatetimeRangeWithTZColumn as input time config + # TODO: add util func to reduce code duplication with TimeZoneLocalizerByColumn + # TODO: add util func to reduce DatetimeRangeWithTZColumn aligned_in_absolute_time to DatetimeRange + """ def __init__( self, @@ -294,16 +335,17 @@ def __init__( def generate_to_time_config(self) -> DatetimeRangeBase: assert isinstance(self._from_schema.time_config, DatetimeRange) # mypy - to_time_config = self._from_schema.time_config.model_copy() - if self._wrap_time_allowed: - to_time_config.start = to_time_config.start.replace(tzinfo=None) - time_kwargs = to_time_config.model_dump() + time_kwargs = self._from_schema.time_config.model_dump() time_kwargs = dict( filter( lambda k_v: k_v[0] in DatetimeRangeWithTZColumn.model_fields, time_kwargs.items(), ) ) + if self._wrap_time_allowed: + time_kwargs["start"] = time_kwargs["start"].replace(tzinfo=None) + + time_kwargs["dtype"] = TimeDataType.TIMESTAMP_NTZ time_kwargs["time_type"] = TimeType.DATETIME_TZ_COL time_kwargs["time_zone_column"] = self.time_zone_column time_kwargs["time_zones"] = self._get_time_zones() diff --git a/src/chronify/time_zone_localizer.py b/src/chronify/time_zone_localizer.py new file mode 100644 index 0000000..721eb56 --- /dev/null +++ b/src/chronify/time_zone_localizer.py @@ -0,0 +1,520 @@ +import abc +from zoneinfo import ZoneInfo +from datetime import tzinfo +from sqlalchemy import Engine, MetaData, Table, select +from typing import Optional +from pathlib import Path +import pandas as pd +from pandas import DatetimeTZDtype + +from chronify.models import TableSchema, MappingTableSchema +from chronify.time_configs import ( + DatetimeRangeBase, + DatetimeRange, + DatetimeRangeWithTZColumn, + TimeBasedDataAdjustment, +) +from chronify.datetime_range_generator import ( + DatetimeRangeGenerator, + DatetimeRangeGeneratorExternalTimeZone, +) +from chronify.exceptions import InvalidParameter, MissingValue +from chronify.time_series_mapper_base import apply_mapping +from chronify.time_range_generator_factory import make_time_range_generator +from chronify.sqlalchemy.functions import read_database +from chronify.time import TimeDataType, TimeType +from chronify.time_series_mapper import map_time +from chronify.time_utils import get_standard_time_zone, is_standard_time_zone + + +def localize_time_zone( + engine: Engine, + metadata: MetaData, + src_schema: TableSchema, + to_time_zone: tzinfo | None, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, +) -> TableSchema: + """Localize TIMESTAMP_NTZ time column in a table to a specified standard time zone. + Input data must be in a standard time zone (without DST) because it's ambiguous to localize + tz-naive timestamps with skips and duplicates to a prevailing time zone. + + Updates table to TIMESTAMP_TZ time column and returns a new time config. + + Parameters + ---------- + engine : sqlalchemy.Engine + SQLAlchemy engine. + metadata : sqlalchemy.MetaData + SQLAlchemy metadata. + src_schema : TableSchema + Defines the source table in the database. + to_time_zone : tzinfo or None + Standard time zone to convert to. If None, convert to tz-naive. + scratch_dir : pathlib.Path, optional + Directory to use for temporary writes. Defaults to the system's tmp filesystem. + output_file : pathlib.Path, optional + If set, write the mapped table to this Parquet file. + check_mapped_timestamps : bool, optional + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Returns + ------- + TableSchema + Schema of output table with converted timestamps. + """ + tzl = TimeZoneLocalizer(engine, metadata, src_schema, to_time_zone) + tzl.localize_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + return tzl._to_schema + + +def localize_time_zone_by_column( + engine: Engine, + metadata: MetaData, + src_schema: TableSchema, + time_zone_column: Optional[str] = None, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, +) -> TableSchema: + """Localize TIMESTAMP_NTZ time column in a table to multiple time zones specified by a column. + Updates table to TIMESTAMP_TZ time column and returns a new time config. + + Parameters + ---------- + engine : sqlalchemy.Engine + SQLAlchemy engine. + metadata : sqlalchemy.MetaData + sqlalchemy metadata + src_schema : TableSchema + Defines the source table in the database. + time_zone_column : Optional[str] + Column name in the source table that contains the time zone information. + - Required if src_schema.time_config is of type DatetimeRange. + - Ignored if src_schema.time_config is of type DatetimeRangeWithTZColumn. + scratch_dir : pathlib.Path, optional + Directory to use for temporary writes. Default to the system's tmp filesystem. + output_file : pathlib.Path, optional + If set, write the mapped table to this Parquet file. + check_mapped_timestamps : bool, optional + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Returns + ------- + dst_schema : TableSchema + schema of output table with converted timestamps + """ + if isinstance(src_schema.time_config, DatetimeRange) and time_zone_column is None: + msg = ( + "time_zone_column must be provided when localizing time zones " + "by column for DatetimeRange time config." + ) + raise MissingValue(msg) + + tzl = TimeZoneLocalizerByColumn( + engine, metadata, src_schema, time_zone_column=time_zone_column + ) + tzl.localize_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + return tzl._to_schema + + +class TimeZoneLocalizerBase(abc.ABC): + """Base class for time zone localization of time series data.""" + + def __init__( + self, + engine: Engine, + metadata: MetaData, + from_schema: TableSchema, + ): + self._engine = engine + self._metadata = metadata + self._from_schema = from_schema + + @staticmethod + @abc.abstractmethod + def _check_from_schema(from_schema: TableSchema) -> None: + """Check that from_schema is valid for time zone localization""" + + @abc.abstractmethod + def generate_to_schema(self) -> TableSchema: + """Generate to_schema based on from_schema""" + + @abc.abstractmethod + def localize_time_zone( + self, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> None: + """Localize tz-naive timestamps to the time zone of the from_schema""" + + +class TimeZoneLocalizer(TimeZoneLocalizerBase): + """Class for time zone localization of tz-naive time series data to a specified time zone. + + Input data table must contain tz-naive timestamps. + Input time config must be of type DatetimeRange with Timestamp_NTZ dtype and tz-naive start time. + to_time_zone must be a standard time zone (without DST) or None. + Output data table will contain tz-aware timestamps. + Output time config will be of type DatetimeRange with Timestamp_TZ dtype and tz-aware start time. + """ + + def __init__( + self, + engine: Engine, + metadata: MetaData, + from_schema: TableSchema, + to_time_zone: tzinfo | None, + ): + self._check_from_schema(from_schema) + super().__init__(engine, metadata, from_schema) + self._to_time_zone = self._check_standard_time_zone(to_time_zone) + self._to_schema = self.generate_to_schema() + + @staticmethod + def _check_from_schema(from_schema: TableSchema) -> None: + msg = "" + if not isinstance(from_schema.time_config, DatetimeRange): + msg += "Source schema does not have DatetimeRange time config. " + if isinstance(from_schema.time_config, DatetimeRangeWithTZColumn): + msg += ( + "Instead, time config is of type DatetimeRangeWithTZColumn, " + f"try using TimeZoneLocalizerByColumn(). {from_schema.time_config}" + ) + raise InvalidParameter(msg) + if from_schema.time_config.dtype != TimeDataType.TIMESTAMP_NTZ: + msg += "Source schema time config dtype must be TIMESTAMP_NTZ. " + if from_schema.time_config.start_time_is_tz_naive() is False: + msg += ( + "Source schema time config start time must be tz-naive." + "To convert between time zones for tz-aware timestamps, " + "try using TimeZoneConverter() " + ) + if msg != "": + msg += f"\n{from_schema.time_config}" + raise InvalidParameter(msg) + + @staticmethod + def _check_standard_time_zone(to_time_zone: tzinfo | None) -> tzinfo | None: + if to_time_zone is None: + return None + if not is_standard_time_zone(to_time_zone): + std_tz = get_standard_time_zone(to_time_zone) + msg = ( + "TimeZoneLocalizer only supports standard time zones (without DST). " + f"{to_time_zone=} is not a standard time zone. Try instead: {std_tz}" + ) + raise InvalidParameter(msg) + return to_time_zone + + def generate_to_time_config(self) -> DatetimeRange: + assert isinstance(self._from_schema.time_config, DatetimeRange) # mypy + to_time_config: DatetimeRange = self._from_schema.time_config.model_copy( + update={ + "dtype": TimeDataType.TIMESTAMP_TZ + if self._to_time_zone + else TimeDataType.TIMESTAMP_NTZ, + "start": self._from_schema.time_config.start.replace(tzinfo=self._to_time_zone), + } + ) + + return to_time_config + + def generate_to_schema(self) -> TableSchema: + to_time_config = self.generate_to_time_config() + to_schema: TableSchema = self._from_schema.model_copy( + update={ + "name": f"{self._from_schema.name}_tz_converted", + "time_config": to_time_config, + } + ) + return to_schema + + def localize_time_zone( + self, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> None: + map_time( + engine=self._engine, + metadata=self._metadata, + from_schema=self._from_schema, + to_schema=self._to_schema, + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + +class TimeZoneLocalizerByColumn(TimeZoneLocalizerBase): + """Class for time zone localization of tz-naive time series data based on a time zone column. + + Input data table must contain tz-naive timestamps and a time zone column. + Time zones in the time zone column must be standard time zones (without DST). + Input time config must be of type DatetimeRangeWithTZColumn or DatetimeRange with Timestamp_NTZ dtype. + - If DatetimeRangeWithTZColumn is used, time_zone_column, if provided, is ignored. + - If DatetimeRange is used, time_zone_column must be provided. It is then converted to + DatetimeRangeWithTZColumn internally. + Output data table will contain tz-aware timestamps and the original time zone column. + Output time config can be of type DatetimeRange or DatetimeRangeWithTZColumn with Timestamp_TZ dtype (see scenarios). + + I/O Time config scenarios: + -------------------------------- + To localize tz-naive timestamps aligned_in_local_standard_time to multiple time zones specified in a column: + - Input time config: DatetimeRangeWithTZColumn with tz-naive start time, Timestamp_NTZ dtype + - Output time config: DatetimeRangeWithTZColumn with tz-naive start time, Timestamp_TZ dtype + + To localize tz-naive timestamps aligned_in_absolute_time to multiple time zones specified in a column: + - Input time config: DatetimeRangeWithTZColumn with tz-aware start time, Timestamp_NTZ dtype + - Output time config: DatetimeRange with tz-aware start time, Timestamp_TZ dtype + Note: output time config is reduced to DatetimeRange (from DatetimeRangeWithTZColumn) + since all timestamps are tz-aware and aligned in absolute time. + -------------------------------- + """ + + def __init__( + self, + engine: Engine, + metadata: MetaData, + from_schema: TableSchema, + time_zone_column: Optional[str] = None, + ): + self._check_from_schema(from_schema) + self._check_time_zone_column(from_schema, time_zone_column) + super().__init__(engine, metadata, from_schema) + if isinstance(self._from_schema.time_config, DatetimeRange): + self.time_zone_column = time_zone_column + self._convert_from_time_config_to_datetime_range_with_tz_column() + else: + self.time_zone_column = self._from_schema.time_config.time_zone_column + self._check_standard_time_zones() + self._to_schema = self.generate_to_schema() + + @staticmethod + def _check_from_schema(from_schema: TableSchema) -> None: + msg = "" + if not isinstance(from_schema.time_config, (DatetimeRange, DatetimeRangeWithTZColumn)): + msg += ( + "Source schema must have DatetimeRange or DatetimeRangeWithTZColumn time config. " + ) + if from_schema.time_config.dtype != TimeDataType.TIMESTAMP_NTZ: + msg += "Source schema time config dtype must be TIMESTAMP_NTZ. " + if msg != "": + msg += f"\n{from_schema.time_config}" + raise InvalidParameter(msg) + + @staticmethod + def _check_time_zone_column(from_schema: TableSchema, time_zone_column: Optional[str]) -> None: + if ( + isinstance(from_schema.time_config, DatetimeRangeWithTZColumn) + and time_zone_column is not None + ): + msg = f"Input {time_zone_column=} will be ignored. time_zone_column is already defined in the time_config." + raise Warning(msg) + + msg = "" + if isinstance(from_schema.time_config, DatetimeRange) and time_zone_column is None: + msg += "time_zone_column must be provided when source schema time config is of type DatetimeRange. " + # if time_zone_column not in from_schema.time_array_id_columns: + # msg = f"{time_zone_column=} must be in source schema time_array_id_columns." + if msg != "": + msg += f"\n{from_schema}" + raise InvalidParameter(msg) + + def _check_standard_time_zones(self) -> None: + """Check that all time zones in the time_zone_column are valid standard time zones.""" + assert isinstance(self._from_schema.time_config, DatetimeRangeWithTZColumn) # mypy + msg = "" + time_zones = self._from_schema.time_config.time_zones + for tz in time_zones: + if tz == "None": + msg += "\nChronify does not support None time zone in time_zone_column. " + raise InvalidParameter(msg) + if not is_standard_time_zone(tz): + std_tz = get_standard_time_zone(tz) + msg = f"\n{tz} is not a standard time zone. Try instead: {std_tz}. " + if msg != "": + msg = ( + f"TimeZoneLocalizerByColumn only supports standard time zones (without DST). {time_zones}" + + msg + ) + raise InvalidParameter(msg) + + def _convert_from_time_config_to_datetime_range_with_tz_column(self) -> None: + """Convert DatetimeRange from_schema time config to DatetimeRangeWithTZColumn time config + for the rest of the workflow + """ + assert isinstance(self._from_schema.time_config, DatetimeRange) + time_kwargs = self._from_schema.time_config.model_dump() + time_kwargs = dict( + filter( + lambda k_v: k_v[0] in DatetimeRangeWithTZColumn.model_fields, + time_kwargs.items(), + ) + ) + time_kwargs["time_type"] = TimeType.DATETIME_TZ_COL + time_kwargs["time_zone_column"] = self.time_zone_column + time_kwargs["time_zones"] = self._get_time_zones() + + self._from_schema.time_config = DatetimeRangeWithTZColumn(**time_kwargs) + + def generate_to_time_config(self) -> DatetimeRangeBase: + assert isinstance(self._from_schema.time_config, DatetimeRangeWithTZColumn) # mypy + match self._from_schema.time_config.start_time_is_tz_naive(): + case True: + # tz-naive start, aligned_in_local_time of the time zones + to_time_config: DatetimeRangeWithTZColumn = ( + self._from_schema.time_config.model_copy( + update={ + "dtype": TimeDataType.TIMESTAMP_TZ, + } + ) + ) + return to_time_config + case False: + # tz-aware start, aligned_in_absolute_time, convert to DatetimeRange config + time_kwargs = self._from_schema.time_config.model_dump() + time_kwargs = dict( + filter( + lambda k_v: k_v[0] in DatetimeRange.model_fields, + time_kwargs.items(), + ) + ) + time_kwargs["dtype"] = TimeDataType.TIMESTAMP_TZ + time_kwargs["time_type"] = TimeType.DATETIME + return DatetimeRange(**time_kwargs) + case _: + msg = ( + "Unable to determine if start time is tz-naive or tz-aware " + f"from time config: {self._from_schema.time_config}" + ) + raise InvalidParameter(msg) + + def generate_to_schema(self) -> TableSchema: + id_cols = self._from_schema.time_array_id_columns + if "time_zone" not in id_cols: + id_cols.append("time_zone") + to_schema: TableSchema = self._from_schema.model_copy( + update={ + "name": f"{self._from_schema.name}_tz_converted", + "time_config": self.generate_to_time_config(), + "time_array_id_columns": id_cols, + } + ) + return to_schema + + def localize_time_zone( + self, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> None: + df, mapping_schema = self._create_mapping() + + apply_mapping( + df, + mapping_schema, + self._from_schema, + self._to_schema, + self._engine, + self._metadata, + TimeBasedDataAdjustment(), + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + def _get_time_zones(self) -> list[tzinfo | None]: + with self._engine.connect() as conn: + table = Table(self._from_schema.name, self._metadata) + stmt = ( + select(table.c[self.time_zone_column]) + .distinct() + .where(table.c[self.time_zone_column].is_not(None)) + ) + time_zones = read_database(stmt, conn, self._from_schema.time_config)[ + self.time_zone_column + ].to_list() + + if "None" in time_zones and len(time_zones) > 1: + msg = ( + "Chronify does not support mix of None and time zones in time_zone_column." + "This is because databases do not support tz-aware and tz-naive timestamps " + f"in the same column: {time_zones}" + ) + raise InvalidParameter(msg) + + time_zones = [ZoneInfo(tz) for tz in time_zones] + return time_zones + + def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: + """Create mapping dataframe for localizing tz-naive datetime to column time zones""" + time_col = self._from_schema.time_config.time_column + from_time_col = "from_" + time_col + from_time_generator = make_time_range_generator(self._from_schema.time_config) + assert isinstance(from_time_generator, DatetimeRangeGeneratorExternalTimeZone) # mypy + from_time_data_dct = from_time_generator.list_timestamps_by_time_zone() + + to_time_generator = make_time_range_generator(self._to_schema.time_config) + match to_time_generator: + case DatetimeRangeGeneratorExternalTimeZone(): # mypy + to_time_data_dct = to_time_generator.list_timestamps_by_time_zone() + case DatetimeRangeGenerator(): # mypy + to_time_data = to_time_generator.list_timestamps() + to_time_data_dct = {tz_name: to_time_data for tz_name in from_time_data_dct.keys()} + case _: + msg = ( + "to_time_generator must be of type " + "DatetimeRangeGeneratorExternalTimeZone or DatetimeRangeGenerator. " + f"Got {type(to_time_generator)}" + ) + raise InvalidParameter(msg) + + from_tz_col = "from_" + self.time_zone_column + from_time_config = self._from_schema.time_config.model_copy( + update={"time_column": from_time_col} + ) + to_time_config = self._to_schema.time_config + + df_tz = [] + primary_tz = ZoneInfo(list(from_time_data_dct.keys())[0]) + for tz_name, from_time_data in from_time_data_dct.items(): + # convert tz-aware timestamps to a single time zone for mapping + # this is because pandas coerces tz-aware timestamps with mixed time zones to object dtype otherwise + to_time_data = [ts.astimezone(primary_tz) for ts in to_time_data_dct[tz_name]] + df_tz.append( + pd.DataFrame( + { + from_time_col: from_time_data, + from_tz_col: tz_name, + time_col: to_time_data, + } + ) + ) + df = pd.concat(df_tz, ignore_index=True) + if not isinstance(df[time_col].dtype, DatetimeTZDtype): + msg = ( + "Mapped time column is expected to be of " + f"DatetimeTZDtype but got {df[time_col].dtype}" + ) + raise InvalidParameter(msg) + + mapping_schema = MappingTableSchema( + name="mapping_table_gtz_conversion", + time_configs=[from_time_config, to_time_config], + ) + return df, mapping_schema diff --git a/tests/test_mapper_column_representative_to_datetime.py b/tests/test_mapper_column_representative_to_datetime.py index 110c816..53a43db 100644 --- a/tests/test_mapper_column_representative_to_datetime.py +++ b/tests/test_mapper_column_representative_to_datetime.py @@ -169,7 +169,7 @@ def test_NYMDPV_mapper(time_series_NYMDPV, iter_store: Store): mapped_table = read_database( f"SELECT * FROM {to_schema.name}", conn, to_schema.time_config ).sort_values("timestamp") - values = np.concat( + values = np.concatenate( [ np.ones(5) * 100, np.ones(7) * 200, diff --git a/tests/test_mapper_datetime_to_datetime.py b/tests/test_mapper_datetime_to_datetime.py index 3d144e9..5d969c2 100644 --- a/tests/test_mapper_datetime_to_datetime.py +++ b/tests/test_mapper_datetime_to_datetime.py @@ -22,8 +22,8 @@ ) -def generate_datetime_data(time_config: DatetimeRange) -> pd.Series: # type: ignore - return pd.to_datetime(list(DatetimeRangeGenerator(time_config)._iter_timestamps())) +def generate_datetime_data(time_config: DatetimeRange) -> pd.DatetimeIndex: # type: ignore + return pd.to_datetime(DatetimeRangeGenerator(time_config).list_timestamps()) def generate_datetime_dataframe(schema: TableSchema) -> pd.DataFrame: diff --git a/tests/test_mapper_index_time_to_datetime.py b/tests/test_mapper_index_time_to_datetime.py index 580537e..df18a30 100644 --- a/tests/test_mapper_index_time_to_datetime.py +++ b/tests/test_mapper_index_time_to_datetime.py @@ -9,8 +9,7 @@ from chronify.time_series_mapper import map_time from chronify.time_configs import ( DatetimeRange, - IndexTimeRangeNTZ, - IndexTimeRangeTZ, + IndexTimeRange, IndexTimeRangeWithTZColumn, TimeBasedDataAdjustment, ) @@ -57,23 +56,18 @@ def data_for_simple_mapping( src_df = pd.DataFrame({"index_time": range(1, nts + 1), "value": range(1, nts + 1)}) if tz_naive: - time_config = IndexTimeRangeNTZ( - start=1, - length=nts, - start_timestamp=start_timestamp, - resolution=interval_resolution, - interval_type=TimeIntervalType.PERIOD_BEGINNING, - time_column="index_time", - ) + st_ts = start_timestamp else: - time_config = IndexTimeRangeTZ( - start=1, - length=nts, - start_timestamp=start_timestamp.tz_localize("US/Mountain"), - resolution=interval_resolution, - interval_type=TimeIntervalType.PERIOD_BEGINNING, - time_column="index_time", - ) + st_ts = start_timestamp.tz_localize("US/Mountain") + + time_config = IndexTimeRange( + start=1, + length=nts, + start_timestamp=st_ts, + resolution=interval_resolution, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="index_time", + ) src_schema = TableSchema( name="input_data", time_config=time_config, diff --git a/tests/test_mapper_representative_time_to_datetime.py b/tests/test_mapper_representative_time_to_datetime.py index 06ead60..2574a8a 100644 --- a/tests/test_mapper_representative_time_to_datetime.py +++ b/tests/test_mapper_representative_time_to_datetime.py @@ -14,8 +14,8 @@ from chronify.datetime_range_generator import DatetimeRangeGenerator -def generate_datetime_data(time_config: DatetimeRange) -> pd.Series: - return pd.to_datetime(list(DatetimeRangeGenerator(time_config)._iter_timestamps())) +def generate_datetime_data(time_config: DatetimeRange) -> pd.DatetimeIndex: + return pd.to_datetime(DatetimeRangeGenerator(time_config).list_timestamps()) def get_datetime_schema(year: int, tzinfo: tzinfo | None) -> TableSchema: diff --git a/tests/test_store.py b/tests/test_store.py index e9204a9..11a5ced 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -939,3 +939,155 @@ def test_convert_time_zone_by_column( for tz, expected in expected_dct.items(): actual = sorted(df2.loc[df2["time_zone"] == tz, "timestamp"]) check_timestamp_lists(actual, expected) + + +@pytest.mark.parametrize("to_time_zone", [ZoneInfo("UTC"), ZoneInfo("EST"), None]) +def test_localize_time_zone( + tmp_path, iter_stores_by_engine_no_data_ingestion: Store, to_time_zone: tzinfo | None +): + """Test time zone localization of tz-naive timestamps to a specified time zone.""" + store = iter_stores_by_engine_no_data_ingestion + time_array_len = 8784 + year = 2020 + + # Create tz-naive source time config + src_time_config = DatetimeRange( + start=datetime(year=year, month=1, day=1, hour=0, tzinfo=None), + resolution=timedelta(hours=1), + length=time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + + src_csv_schema = CsvTableSchema( + time_config=src_time_config, + column_dtypes=[ + ColumnDType(name="timestamp", dtype=DateTime(timezone=False)), + ColumnDType(name="gen1", dtype=Double()), + ColumnDType(name="gen2", dtype=Double()), + ColumnDType(name="gen3", dtype=Double()), + ], + value_columns=["gen1", "gen2", "gen3"], + pivoted_dimension_name="generator", + time_array_id_columns=[], + ) + rel = read_csv(GENERATOR_TIME_SERIES_FILE, src_csv_schema) + rel2 = unpivot(rel, ("gen1", "gen2", "gen3"), "generator", "value") # noqa: F841 + + src_schema = TableSchema( + name="generators_pb", + time_config=src_time_config, + time_array_id_columns=["generator"], + value_column="value", + ) + if store.engine.name == "hive": + out_file = tmp_path / "data.parquet" + rel2.to_df().to_parquet(out_file) + store.create_view_from_parquet(out_file, src_schema) + else: + store.ingest_table(rel2, src_schema) + + if to_time_zone is None and store.engine.name != "sqlite": + output_file = tmp_path / "mapped_data" + else: + output_file = None + + dst_schema = store.localize_time_zone( + src_schema.name, + to_time_zone, + output_file=output_file, + check_mapped_timestamps=True, + ) + if output_file is None or store.engine.name == "sqlite": + df2 = store.read_table(dst_schema.name) + else: + df2 = pd.read_parquet(output_file) + df2["timestamp"] = pd.to_datetime(df2["timestamp"]) + assert len(df2) == time_array_len * 3 + actual = sorted(df2["timestamp"].unique()) + assert isinstance(dst_schema.time_config, DatetimeRange) + if to_time_zone: + # Should be localized to the target time zone + assert dst_schema.time_config.start_time_is_tz_naive() is False + else: + # Should remain tz-naive + assert dst_schema.time_config.start_time_is_tz_naive() is True + assert pd.Timestamp(actual[0]) == dst_schema.time_config.start + expected = make_time_range_generator(dst_schema.time_config).list_timestamps() + expected = sorted(set(expected)) + check_timestamp_lists(actual, expected) + + +def test_localize_time_zone_by_column(tmp_path, iter_stores_by_engine_no_data_ingestion: Store): + """Test time zone localization based on a time zone column.""" + store = iter_stores_by_engine_no_data_ingestion + time_array_len = 8784 + year = 2020 + + # Create tz-naive source time config + src_time_config = DatetimeRange( + start=datetime(year=year, month=1, day=1, hour=0, tzinfo=None), + resolution=timedelta(hours=1), + length=time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + + src_csv_schema = CsvTableSchema( + time_config=src_time_config, + column_dtypes=[ + ColumnDType(name="timestamp", dtype=DateTime(timezone=False)), + ColumnDType(name="gen1", dtype=Double()), + ColumnDType(name="gen2", dtype=Double()), + ColumnDType(name="gen3", dtype=Double()), + ], + value_columns=["gen1", "gen2", "gen3"], + pivoted_dimension_name="generator", + time_array_id_columns=[], + ) + rel = read_csv(GENERATOR_TIME_SERIES_FILE, src_csv_schema) + rel2 = unpivot(rel, ("gen1", "gen2", "gen3"), "generator", "value") # noqa: F841 + # add time_zone column with standard time zones (not DST) + stmt = ", ".join(rel2.columns) + tz_col_stmt = "CASE WHEN generator='gen1' THEN 'Etc/GMT+5' WHEN generator='gen2' THEN 'Etc/GMT+6' ELSE 'Etc/GMT+7' END AS time_zone" + stmt += f", {tz_col_stmt}" + rel2 = rel2.project(stmt) + + src_schema = TableSchema( + name="generators_pb", + time_config=src_time_config, + time_array_id_columns=["generator", "time_zone"], + value_column="value", + ) + if store.engine.name == "hive": + out_file = tmp_path / "data.parquet" + rel2.to_df().to_parquet(out_file) + store.create_view_from_parquet(out_file, src_schema) + else: + store.ingest_table(rel2, src_schema) + + if store.engine.name != "sqlite": + output_file = tmp_path / "mapped_data" + else: + output_file = None + + dst_schema = store.localize_time_zone_by_column( + src_schema.name, + "time_zone", + output_file=output_file, + check_mapped_timestamps=True, + ) + if output_file is None or store.engine.name == "sqlite": + df2 = store.read_table(dst_schema.name) + else: + df2 = pd.read_parquet(output_file) + df2["timestamp"] = pd.to_datetime(df2["timestamp"]) + df_stats = df2.groupby(["time_zone"])["timestamp"].agg(["min", "max", "count"]) + assert set(df_stats["count"]) == {time_array_len} + assert isinstance(dst_schema.time_config, DatetimeRangeWithTZColumn) + # Verify that each time zone has tz-aware localized timestamps + assert df2["timestamp"].dt.tz is not None + expected_dct = make_time_range_generator(dst_schema.time_config).list_timestamps_by_time_zone() + for tz, expected in expected_dct.items(): + actual = sorted(df2.loc[df2["time_zone"] == tz, "timestamp"]) + check_timestamp_lists(actual, expected) diff --git a/tests/test_time_zone_converter.py b/tests/test_time_zone_converter.py index 3b0230a..aca8cbf 100644 --- a/tests/test_time_zone_converter.py +++ b/tests/test_time_zone_converter.py @@ -16,18 +16,19 @@ ) from chronify.time_configs import DatetimeRange from chronify.models import TableSchema -from chronify.time import TimeIntervalType +from chronify.time import TimeDataType, TimeIntervalType from chronify.datetime_range_generator import DatetimeRangeGenerator from chronify.exceptions import InvalidParameter -def generate_datetime_data(time_config: DatetimeRange) -> pd.Series: # type: ignore - return pd.to_datetime(list(DatetimeRangeGenerator(time_config)._iter_timestamps())) - - def generate_datetime_dataframe(schema: TableSchema) -> pd.DataFrame: - df = pd.DataFrame({schema.time_config.time_column: generate_datetime_data(schema.time_config)}) - + df = pd.DataFrame( + { + schema.time_config.time_column: pd.to_datetime( + DatetimeRangeGenerator(schema.time_config).list_timestamps() + ) + } + ) for i, x in enumerate(schema.time_array_id_columns): df[x] = i df[schema.value_column] = np.random.rand(len(df)) @@ -70,6 +71,7 @@ def get_datetime_schema( schema = TableSchema( name=name, time_config=DatetimeRange( + dtype=TimeDataType.TIMESTAMP_TZ if tzinfo else TimeDataType.TIMESTAMP_NTZ, start=start, resolution=resolution, length=length, @@ -183,10 +185,8 @@ def run_conversion_with_error( def test_src_table_no_time_zone(iter_engines: Engine) -> None: from_schema = get_datetime_schema(2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table") df = generate_datetime_dataframe(from_schema) - error = (InvalidParameter, "Source schema start_time must be timezone-aware") - run_conversion_with_error( - iter_engines, df, from_schema, False, error - ) # TODO, support tz-naive to tz-aware conversion + error = (InvalidParameter, "Source schema time config start time must be timezone-aware") + run_conversion_with_error(iter_engines, df, from_schema, False, error) @pytest.mark.parametrize( diff --git a/tests/test_time_zone_localizer.py b/tests/test_time_zone_localizer.py new file mode 100644 index 0000000..bc2968d --- /dev/null +++ b/tests/test_time_zone_localizer.py @@ -0,0 +1,368 @@ +from zoneinfo import ZoneInfo +from datetime import datetime, timedelta, tzinfo +import numpy as np +import pytest +from typing import Any + +import pandas as pd +from sqlalchemy import Engine, MetaData + +from chronify.sqlalchemy.functions import read_database, write_database +from chronify.time_utils import get_standard_time_zone +from chronify.time_zone_localizer import ( + TimeZoneLocalizer, + TimeZoneLocalizerByColumn, + localize_time_zone, + localize_time_zone_by_column, +) +from chronify.time_configs import DatetimeRange, DatetimeRangeWithTZColumn, DatetimeRangeBase +from chronify.models import TableSchema +from chronify.time import TimeDataType, TimeIntervalType +from chronify.datetime_range_generator import ( + DatetimeRangeGenerator, + DatetimeRangeGeneratorExternalTimeZone, +) +from chronify.exceptions import InvalidParameter + + +def generate_datetime_dataframe(schema: TableSchema) -> pd.DataFrame: + df = pd.DataFrame( + { + schema.time_config.time_column: pd.to_datetime( + DatetimeRangeGenerator(schema.time_config).list_timestamps() + ) + } + ) + for i, x in enumerate(schema.time_array_id_columns): + df[x] = i + df[schema.value_column] = np.random.rand(len(df)) + return df + + +def generate_dataframe_with_tz_col(schema: TableSchema) -> pd.DataFrame: + time_col = schema.time_config.time_column + ts_dct = DatetimeRangeGeneratorExternalTimeZone( + schema.time_config + ).list_timestamps_by_time_zone() + dfo_list = [] + for i, (time_zone, data) in enumerate(ts_dct.items()): + dfo_list.append( + pd.DataFrame( + {x: i for x in schema.time_array_id_columns} + | { + time_col: pd.to_datetime(data).tz_localize(None), + "time_zone": time_zone, + schema.value_column: np.random.rand(len(data)), + } + ) + ) + dfo = pd.concat(dfo_list, ignore_index=True) + dfo = dfo.reset_index() + return dfo + + +def get_datetime_schema( + year: int, + tzinfo: tzinfo | None, + interval_type: TimeIntervalType, + name: str, +) -> TableSchema: + start = datetime(year=year, month=3, day=11, tzinfo=tzinfo) + end = datetime(year=year, month=3, day=12, tzinfo=tzinfo) + resolution = timedelta(hours=1) + length = (end - start) / resolution + 1 + cols = ["id"] + time_config = DatetimeRange( + dtype=TimeDataType.TIMESTAMP_TZ if tzinfo else TimeDataType.TIMESTAMP_NTZ, + start=start, + resolution=resolution, + length=length, + interval_type=interval_type, + time_column="timestamp", + ) + schema = TableSchema( + name=name, + time_config=time_config, + time_array_id_columns=cols, + value_column="value", + ) + return schema + + +def get_datetime_with_tz_col_schema( + year: int, + tzinfo: tzinfo | None, + interval_type: TimeIntervalType, + name: str, + standard_tz: bool = False, +) -> TableSchema: + start = datetime(year=year, month=3, day=11, tzinfo=tzinfo) + end = datetime(year=year, month=3, day=12, tzinfo=tzinfo) + resolution = timedelta(hours=1) + length = (end - start) / resolution + 1 + cols = ["id"] + time_zones = [ + ZoneInfo("US/Eastern"), + ZoneInfo("US/Central"), + ZoneInfo("US/Mountain"), + ] + if standard_tz: + time_zones = [get_standard_time_zone(tz) for tz in time_zones] + time_config: DatetimeRangeBase = DatetimeRangeWithTZColumn( + dtype=TimeDataType.TIMESTAMP_NTZ, + start=start, + resolution=resolution, + length=length, + interval_type=interval_type, + time_column="timestamp", + time_zone_column="time_zone", + time_zones=time_zones, + ) + schema = TableSchema( + name=name, + time_config=time_config, + time_array_id_columns=cols, + value_column="value", + ) + return schema + + +def ingest_data( + engine: Engine, + metadata: MetaData, + df: pd.DataFrame, + schema: TableSchema, +) -> None: + with engine.begin() as conn: + write_database(df, conn, schema.name, [schema.time_config], if_table_exists="replace") + metadata.reflect(engine, views=True) + + +def get_mapped_dataframe( + engine: Engine, + table_name: str, + time_config: DatetimeRangeBase, +) -> pd.DataFrame: + with engine.connect() as conn: + query = f"select * from {table_name}" + queried = read_database(query, conn, time_config) + queried = queried.sort_values(by=["id", "timestamp"]).reset_index(drop=True) + return queried + + +def run_localization( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, + to_time_zone: tzinfo | None, +) -> None: + metadata = MetaData() + ingest_data(engine, metadata, df, from_schema) + to_schema = localize_time_zone( + engine, metadata, from_schema, to_time_zone, check_mapped_timestamps=True + ) + dfo = get_mapped_dataframe(engine, to_schema.name, to_schema.time_config) + assert df["value"].equals(dfo["value"]) + if to_time_zone is None: + expected = df["timestamp"] + else: + std_tz = get_standard_time_zone(to_time_zone) + expected = df["timestamp"].dt.tz_localize(std_tz).dt.tz_convert(to_time_zone) + + assert (dfo["timestamp"] == expected).prod() == 1 + + +def run_localization_to_column_time_zones( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, +) -> None: + metadata = MetaData() + ingest_data(engine, metadata, df, from_schema) + to_schema = localize_time_zone_by_column( + engine, + metadata, + from_schema, + check_mapped_timestamps=True, + ) + dfo = get_mapped_dataframe(engine, to_schema.name, to_schema.time_config) + dfo = dfo[df.columns].sort_values(by="index").reset_index(drop=True) + dfo["timestamp"] = pd.to_datetime(dfo["timestamp"]) # needed for engine 2, not sure why + assert df["value"].equals(dfo["value"]) + for i in range(len(dfo)): + tzn = dfo.loc[i, "time_zone"] + if tzn == "None": + ts = dfo.loc[i, "timestamp"].replace(tzinfo=None) + else: + # source data is in local standard time + ts = dfo.loc[i, "timestamp"].tz_convert(tzn).replace(tzinfo=None) + + assert df.loc[i, "timestamp"] == ts + + +def run_localization_with_error( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, + error: tuple[Any, str], +) -> None: + metadata = MetaData() + ingest_data(engine, metadata, df, from_schema) + + with pytest.raises(error[0], match=error[1]): + TimeZoneLocalizer(engine, metadata, from_schema, None).localize_time_zone( + check_mapped_timestamps=True + ) + + +def run_localization_by_column_with_error( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, + error: tuple[Any, str], + time_zone_column: str | None = None, +) -> None: + metadata = MetaData() + ingest_data(engine, metadata, df, from_schema) + + with pytest.raises(error[0], match=error[1]): + TimeZoneLocalizerByColumn( + engine, + metadata, + from_schema, + time_zone_column=time_zone_column, + ).localize_time_zone(check_mapped_timestamps=True) + + +@pytest.mark.parametrize("to_time_zone", [None, ZoneInfo("EST")]) +def test_time_localization(iter_engines: Engine, to_time_zone: tzinfo | None) -> None: + from_schema = get_datetime_schema(2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table") + df = generate_datetime_dataframe(from_schema) + run_localization(iter_engines, df, from_schema, to_time_zone) + + +@pytest.mark.parametrize("from_time_tz", [None, ZoneInfo("US/Mountain"), ZoneInfo("MST")]) +def test_time_localization_by_column(iter_engines: Engine, from_time_tz: tzinfo | None) -> None: + from_schema = get_datetime_with_tz_col_schema( + 2018, + from_time_tz, + TimeIntervalType.PERIOD_BEGINNING, + "base_table", + standard_tz=True, + ) + df = generate_dataframe_with_tz_col(from_schema) + run_localization_to_column_time_zones(iter_engines, df, from_schema) + + +# Error tests for TimeZoneLocalizer +def test_time_localizer_to_dst_time_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizer raises error when to_time_zone is a non standard time zone""" + from_schema = get_datetime_schema(2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table") + df = generate_datetime_dataframe(from_schema) + to_time_zone = ZoneInfo("US/Mountain") # has DST + metadata = MetaData() + ingest_data(iter_engines, metadata, df, from_schema) + with pytest.raises( + InvalidParameter, match="TimeZoneLocalizer only supports standard time zones" + ): + localize_time_zone( + iter_engines, metadata, from_schema, to_time_zone, check_mapped_timestamps=True + ) + + +def test_time_localizer_with_tz_aware_config_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizer raises error when start time is tz-aware""" + from_schema = get_datetime_schema( + 2018, ZoneInfo("US/Mountain"), TimeIntervalType.PERIOD_BEGINNING, "base_table" + ) + df = generate_datetime_dataframe(from_schema) + error = (InvalidParameter, "Source schema time config start time must be tz-naive") + run_localization_with_error(iter_engines, df, from_schema, error) + + +def test_time_localizer_with_wrong_dtype_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizer raises error when dtype is not TIMESTAMP_NTZ""" + from_schema = get_datetime_schema(2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table") + # Manually change dtype to TIMESTAMP_TZ to trigger error + from_schema.time_config = from_schema.time_config.model_copy( + update={"dtype": TimeDataType.TIMESTAMP_TZ} + ) + df = generate_datetime_dataframe(from_schema) + error = (InvalidParameter, "Source schema time config dtype must be TIMESTAMP_NTZ") + run_localization_with_error(iter_engines, df, from_schema, error) + + +def test_time_localizer_with_datetime_range_with_tz_col_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizer raises error when time config is DatetimeRangeWithTZColumn""" + from_schema = get_datetime_with_tz_col_schema( + 2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table", standard_tz=True + ) + df = generate_dataframe_with_tz_col(from_schema) + error = (InvalidParameter, "try using TimeZoneLocalizerByColumn") + run_localization_with_error(iter_engines, df, from_schema, error) + + +# Error tests for TimeZoneLocalizerByColumn +def test_time_localizer_by_column_to_dst_time_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizerByColumn raises error when to_time_zone is a non standard time zone""" + from_schema = get_datetime_with_tz_col_schema( + 2018, + None, + TimeIntervalType.PERIOD_BEGINNING, + "base_table", + standard_tz=False, + ) + df = generate_dataframe_with_tz_col(from_schema) + metadata = MetaData() + ingest_data(iter_engines, metadata, df, from_schema) + with pytest.raises( + InvalidParameter, match="TimeZoneLocalizerByColumn only supports standard time zones" + ): + localize_time_zone_by_column( + iter_engines, metadata, from_schema, check_mapped_timestamps=True + ) + + +def test_time_localizer_by_column_missing_tz_column_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizerByColumn raises error when time_zone_column is missing for DatetimeRange""" + from_schema = get_datetime_schema(2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table") + df = generate_datetime_dataframe(from_schema) + error = (InvalidParameter, "time_zone_column must be provided") + run_localization_by_column_with_error(iter_engines, df, from_schema, error) + + +def test_time_localizer_by_column_wrong_dtype_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizerByColumn raises error when dtype is not TIMESTAMP_NTZ""" + from_schema = get_datetime_with_tz_col_schema( + 2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table", standard_tz=True + ) + # Change dtype to TIMESTAMP_TZ to trigger error + from_schema.time_config = from_schema.time_config.model_copy( + update={"dtype": TimeDataType.TIMESTAMP_TZ} + ) + df = generate_dataframe_with_tz_col(from_schema) + error = (InvalidParameter, "Source schema time config dtype must be TIMESTAMP_NTZ") + run_localization_by_column_with_error(iter_engines, df, from_schema, error) + + +def test_time_localizer_by_column_non_standard_tz_error(iter_engines: Engine) -> None: + """Test that TimeZoneLocalizerByColumn raises error when time zones are not standard""" + from_schema = get_datetime_with_tz_col_schema( + 2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table", standard_tz=False + ) + df = generate_dataframe_with_tz_col(from_schema) + error = (InvalidParameter, "is not a standard time zone") + run_localization_by_column_with_error(iter_engines, df, from_schema, error) + + +def test_localize_time_zone_by_column_missing_tz_column_error(iter_engines: Engine) -> None: + """Test that localize_time_zone_by_column raises error when time_zone_column is None for DatetimeRange""" + from_schema = get_datetime_schema(2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table") + df = generate_datetime_dataframe(from_schema) + error = ( + Exception, + "time_zone_column must be provided when source schema time config is of type DatetimeRange", + ) + run_localization_by_column_with_error( + iter_engines, df, from_schema, error, time_zone_column=None + )