diff --git a/dockerfile b/dockerfile index 2308e02..20d545a 100644 --- a/dockerfile +++ b/dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7 +FROM python:3.12 RUN mkdir /app COPY . /app diff --git a/requirements.txt b/requirements.txt index cd5f567..6639f55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ -pandas==0.24.2 -pyarrow==0.13.0 -boto3==1.9.177 -s3fs==0.2.1 -dfmock==0.0.14 -moto==1.3.8 -SQLAlchemy==1.3.5 -pytest==5.0.0 -psycopg2==2.8.3 -numpy==1.19.5 +pandas~=2.2.2 +pyarrow~=17.0.0 +boto3~=1.34.160 +s3fs~=0.4.2 +dfmock~=0.0.14 +moto[server]~=5.0.12 +SQLAlchemy~=2.0.32 +psycopg2~=2.9.9 +pytest~=8.3.2 +numpy~=2.0.1 +mock~=5.1.0 \ No newline at end of file diff --git a/s3parq/fetch_parq.py b/s3parq/fetch_parq.py index b88d4d8..3ceb8a2 100644 --- a/s3parq/fetch_parq.py +++ b/s3parq/fetch_parq.py @@ -1,4 +1,5 @@ import ast +from urllib import parse import boto3 from collections import OrderedDict import datetime @@ -151,7 +152,7 @@ def get_max_partition_value(bucket: str, key: str, partition: str) -> any: def fetch(bucket: str, key: str, filters: List[type(Filter)] = {}, parallel: bool = True, accept_not_s3parq: bool = True) -> pd.DataFrame: """ S3 Parquet to Dataframe Fetcher. - This function handles the portion of work that will return a concatenated + This function handles the portion of work that will return a concatenated dataframe based on the partition filters of the specified dataset. Args: @@ -164,7 +165,7 @@ def fetch(bucket: str, key: str, filters: List[type(Filter)] = {}, parallel: boo The published partition to be applied to - comparison (str): Comparison function - one of: [ == , != , > , < , >= , <= ] - - values (List(any)): + - values (List(any)): Values to compare to - must match partition data type May not use multiple values with the '<', '>' comparisons parallel (bool, Optional): @@ -231,7 +232,7 @@ def fetch(bucket: str, key: str, filters: List[type(Filter)] = {}, parallel: boo def fetch_diff(input_bucket: str, input_key: str, comparison_bucket: str, comparison_key: str, partition: str, reverse: bool = False, parallel: bool = True) -> pd.DataFrame: - """ Returns a dataframe of whats in the input dataset but not the comparison + """ Returns a dataframe of whats in the input dataset but not the comparison dataset by the specified partition. Args: @@ -241,7 +242,7 @@ def fetch_diff(input_bucket: str, input_key: str, comparison_bucket: str, compar comparison_key (str): The key to the dataset to compare against partition (str): The partition in the dataset to compare the values of reverse (bool, Optional): Determines if the operation should be inversed, - if True it will look for the values in comparison that are not + if True it will look for the values in comparison that are not in the input (basically backwards). Defaults to False parallel (bool, Optional): Determines if multiprocessing should be used, defaults to True. @@ -297,8 +298,12 @@ def convert_type(val: Any, dtype: str) -> Any: elif dtype == 'float': return float(val) elif dtype == 'datetime': - return datetime.datetime.strptime( - val, '%Y-%m-%d %H:%M:%S') + format = '%Y-%m-%d %H:%M:%S' + try: + return datetime.datetime.strptime( + val, format) + except ValueError: + return _coerce_datetime_partition(val) elif dtype == 'category': return pd.Category(val) elif dtype == 'bool' or dtype == 'boolean': @@ -355,7 +360,7 @@ def get_all_files_list(bucket: str, key: str) -> list: def _get_partitions_and_types(first_file_key: str, bucket: str) -> dict: - """ Fetch a list of all the partitions actually there and their + """ Fetch a list of all the partitions actually there and their datatypes. List may be different than passed list if not being used for filtering on. NOTE: This is pulled from the metadata. It is assumed that this package @@ -417,6 +422,15 @@ def _parse_partitions_and_values(file_paths: List[str], key: str) -> dict: return parts +def _coerce_datetime_partition(partition_value:str) -> datetime.datetime: + """datetimes in pyarrow are fickle now as hive partitions. + This attempts to force a resolution from them + """ + try: + return datetime.datetime.strptime(partition_value, "%Y-%m-%d %H:%M:%S") + except ValueError: + partition_value = parse.unquote(partition_value).split('.')[0] + return datetime.datetime.strptime(partition_value, "%Y-%m-%d %H:%M:%S") def _get_partition_value_data_types(parsed_parts: dict, part_types: dict) -> dict: """ Uses the partitions with their known types to parse them out @@ -438,7 +452,7 @@ def _get_partition_value_data_types(parsed_parts: dict, part_types: dict) -> dic parsed_parts[part] = set(map(float, values)) elif part_type == 'datetime': parsed_parts[part] = set( - map(lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"), values)) + map(lambda s: _coerce_datetime_partition(s), values)) elif (part_type == 'bool') or (part_type == 'boolean'): parsed_parts[part] = set(map(bool, values)) else: @@ -499,7 +513,7 @@ def construct_paths(matched_parts, previous_fil_keys: List[str]) -> None: def _get_filtered_data(bucket: str, paths: List[str], partition_metadata: dict, parallel: bool = True) -> pd.DataFrame: - """ Gets the data based on the filtered object key list. Concatenates all + """ Gets the data based on the filtered object key list. Concatenates all the separate parquet files. Args: @@ -581,7 +595,7 @@ def _repopulate_partitions(partition_string: str, partition_metadata: dict) -> t if partition_metadata: for key, val in partitions.items(): partitions[key] = convert_type(val, partition_metadata[key]) - + return partitions @@ -616,7 +630,7 @@ def _validate_filter_rules(filters: List[type(Filter)]) -> None: def _validate_matching_filter_data_type(part_types, filters: List[type(Filter)]) -> None: """ Validate that the filters passed are matching to the partitions' - listed datatypes, otherwise throw a ValueError. + listed datatypes, otherwise throw a ValueError. This includes validating comparisons too. Args: diff --git a/s3parq/testing_helper.py b/s3parq/testing_helper.py index f233c6f..c77a2b9 100644 --- a/s3parq/testing_helper.py +++ b/s3parq/testing_helper.py @@ -6,7 +6,7 @@ import os import pandas as pd import numpy as np -from pandas.util.testing import assert_frame_equal +from pandas.testing import assert_frame_equal import pyarrow as pa import pyarrow.parquet as pq import random @@ -75,7 +75,7 @@ def sorted_dfs_equal_by_pandas_testing(df1: pd.DataFrame, df2: pd.DataFrame) -> df1 = df1.sort_values( by=df1.columns.tolist()).reset_index(drop=True) # The setup on this part is a pain but it's the most specific check - assert_frame_equal(df2, df1) + assert_frame_equal(df2, df1, check_dtype=False) # START SETUP BASED @@ -145,6 +145,12 @@ def setup_grouped_dataframe(count: int = 100, columns: Dict = None): } df.columns = columns df.generate_dataframe() + frame = df.dataframe + try: + # always seconds + frame['datetime_col'] = frame["datetime_col"].astype("datetime64[us]") + except KeyError: + pass return df.dataframe @@ -168,7 +174,7 @@ def setup_partitioned_parquet( will be the default if not supplied s3_client (boto3 S3 client, Optional): The started S3 client that boto uses - NOTE: this should be made under a moto S3 mock! - If it is not provided, a session is crafted under moto.mock_s3 + If it is not provided, a session is crafted under moto.mock_aws Returns: A tuple of the bucket and the published parquet file paths @@ -197,10 +203,11 @@ def setup_partitioned_parquet( table = pa.Table.from_pandas(dataframe) pq.write_to_dataset(table, root_path=str(tmp_dir), + coerce_timestamps='ms', + allow_truncated_timestamps=True, partition_cols=list( - partition_data_types.keys()) + partition_data_types.keys()), ) - parquet_paths = [] # traverse the local parquet tree @@ -236,7 +243,7 @@ def setup_nons3parq_parquet( string if not supplied s3_client (boto3 S3 client, Optional): The started S3 client that boto uses - NOTE: this should be made under a moto S3 mock! - If it is not provided, a session is crafted under moto.mock_s3 + If it is not provided, a session is crafted under moto.mock_aws Returns: A tuple of the bucket and the published parquet file paths @@ -251,7 +258,7 @@ def setup_nons3parq_parquet( with ExitStack() as stack: tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) if not s3_client: - stack.enter_context(moto.mock_s3()) + stack.enter_context(moto.mock_aws()) s3_client = boto3.client('s3') s3_client.create_bucket(Bucket=bucket) @@ -298,8 +305,8 @@ def setup_custom_redshift_columns_and_dataframe(): def setup_custom_redshift_columns_and_dataframe_with_null(): """ Create a custom_redshift_columns dictionary that contains redshift column definitions and corresponding mock dataframe """ - sample_data = {'colA': [1, 2, np.NaN], 'colB': ['DDD', None, 'FFF'], - 'colC': [pd.Timestamp('20131213 11:59:59.999999999'), None, pd.Timestamp('20131213 11:59:59.999999999')], 'colE': [7.5, 3.4, np.NaN]} + sample_data = {'colA': [1, 2, np.nan], 'colB': ['DDD', None, 'FFF'], + 'colC': [pd.Timestamp('20131213 11:59:59.999999999'), None, pd.Timestamp('20131213 11:59:59.999999999')], 'colE': [7.5, 3.4, np.nan]} dataframe = pd.DataFrame(data=sample_data) diff --git a/setup.py b/setup.py index dd1f765..6a5872e 100644 --- a/setup.py +++ b/setup.py @@ -23,15 +23,20 @@ "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.7" + "Programming Language :: Python :: 3.8" + "Programming Language :: Python :: 3.9" + "Programming Language :: Python :: 3.10" + "Programming Language :: Python :: 3.11" + "Programming Language :: Python :: 3.12" ], packages=find_packages(exclude=("tests",)), include_package_data=True, install_requires=[ - "pandas>=0.24.2, <1", + "pandas>=2.2.2, <3", "pyarrow>=0.13.0", "boto3>=1.9", "s3fs>=0.2", - "psycopg2==2.8.3", + "psycopg2>=2.8.3", "SQLAlchemy>=1.3" ] ) diff --git a/tests/conftest.py b/tests/conftest.py index 59a80d5..a4fa1f9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,19 @@ import pytest +import os +from moto.server import ThreadedMotoServer +@pytest.fixture +async def mock_boto(): + server = ThreadedMotoServer(port=0) + + server.start() + port = server._server.socket.getsockname()[1] + os.environ["AWS_ENDPOINT_URL"] = f"http://127.0.0.1:{port}" + + yield + + del os.environ["AWS_ENDPOINT_URL"] + server.stop() def pytest_addoption(parser): parser.addoption( diff --git a/tests/mock_helper.py b/tests/mock_helper.py index 04a2e2d..cbc45f2 100644 --- a/tests/mock_helper.py +++ b/tests/mock_helper.py @@ -1,7 +1,7 @@ import os import pytest import boto3 -from moto import mock_s3 +from moto import mock_aws import pandas as pd import pyarrow as pa import pyarrow.parquet as pq @@ -13,18 +13,18 @@ import warnings -@mock_s3 +@mock_aws class MockHelper: """ creates some helpful dataset stuff. - dataframe is the frame created - - s3_bucket is the test bucket that has been populated - be sure to wrap this with moto @mock_s3 if you want to mock it + - s3_bucket is the test bucket that has been populated + be sure to wrap this with moto @mock_aws if you want to mock it """ def __init__(self, count=1000000, s3=False, files=False): """ If s3 then will populate the s3 bucket with partitioned parquet. """ - warnings.warn("MockHelper is a mess and will be removed in s3parq version 2.20", + warnings.warn("MockHelper is a mess and will be removed in s3parq version 2.20", DeprecationWarning) self._dataframe = self.setup_grouped_dataframe(count=count) diff --git a/tests/test_fetch_parq.py b/tests/test_fetch_parq.py index dc2caf0..282f45d 100644 --- a/tests/test_fetch_parq.py +++ b/tests/test_fetch_parq.py @@ -22,7 +22,7 @@ @contextlib.contextmanager def get_s3_client(): - with moto.mock_s3(): + with moto.mock_aws(): yield boto3.client('s3') diff --git a/tests/test_fetch_parq_internal.py b/tests/test_fetch_parq_internal.py index b26b9e1..494ef46 100644 --- a/tests/test_fetch_parq_internal.py +++ b/tests/test_fetch_parq_internal.py @@ -17,7 +17,7 @@ @contextlib.contextmanager def get_s3_client(): - with moto.mock_s3(): + with moto.mock_aws(): yield boto3.client('s3') @@ -386,8 +386,8 @@ def test_s3_partitioned_parquet_to_dataframe(): full_response = pd.DataFrame() for path in parquet_paths: - full_response = full_response.append(fetch_parq._s3_parquet_to_dataframe( - bucket=bucket, key=path, partition_metadata=partition_types)) + full_response = pd.concat([full_response, fetch_parq._s3_parquet_to_dataframe( + bucket=bucket, key=path, partition_metadata=partition_types)], ignore_index=True) assert full_response.shape == df.shape sorted_dfs_equal_by_pandas_testing(full_response, df) diff --git a/tests/test_integration.py b/tests/test_integration.py index f4fff65..2fa4de7 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -2,7 +2,7 @@ import dfmock import moto import pandas as pd -from pandas.util.testing import assert_frame_equal +from pandas.testing import assert_frame_equal import pytest from s3parq.fetch_parq import fetch @@ -10,7 +10,7 @@ from s3parq.testing_helper import df_equal_by_set, sorted_dfs_equal_by_pandas_testing -@moto.mock_s3 +@moto.mock_aws def test_end_to_end(): # make a sample DF for all the tests df = dfmock.DFMock(count=10000) diff --git a/tests/test_integration_stress.py b/tests/test_integration_stress.py index c248211..e7f3d70 100644 --- a/tests/test_integration_stress.py +++ b/tests/test_integration_stress.py @@ -2,7 +2,7 @@ import dfmock import moto import pandas as pd -from pandas.util.testing import assert_frame_equal +from pandas.testing import assert_frame_equal import pytest from s3parq.fetch_parq import fetch @@ -11,7 +11,7 @@ @pytest.mark.slow -@moto.mock_s3 +@moto.mock_aws def test_end_to_end(): df = dfmock.DFMock(count=100000) df.columns = {"string_options": {"option_count": 4, "option_type": "string"}, diff --git a/tests/test_publish_parq.py b/tests/test_publish_parq.py index ec340f2..c874ad9 100644 --- a/tests/test_publish_parq.py +++ b/tests/test_publish_parq.py @@ -1,7 +1,7 @@ import boto3 from dfmock import DFMock from mock import patch -from moto import mock_s3 +from moto import mock_aws import pandas as pd import pyarrow as pa import pyarrow.parquet as pq @@ -22,7 +22,7 @@ @contextlib.contextmanager def get_s3_client(): - with moto.mock_s3(): + with moto.mock_aws(): yield boto3.client('s3') diff --git a/tests/test_publish_parq_stress.py b/tests/test_publish_parq_stress.py index d00682f..ff70199 100644 --- a/tests/test_publish_parq_stress.py +++ b/tests/test_publish_parq_stress.py @@ -1,13 +1,13 @@ import boto3 from dfmock import DFMock -from moto import mock_s3 +from moto import mock_aws import pytest import s3parq.publish_parq as pub_parq @pytest.mark.slow -@mock_s3 +@mock_aws def test_parquet_sizes(): """ Running two tests - this needs to work with and without partitions """ bucket = "testbucket" @@ -28,7 +28,7 @@ def test_parquet_sizes(): @pytest.mark.slow -@mock_s3 +@mock_aws def test_parquet_sizes_with_partition(): """ Running two tests - this needs to work with and without partitions """ bucket = "testbucket" diff --git a/tests/test_publish_redshift.py b/tests/test_publish_redshift.py index d63b8ee..ed926bb 100644 --- a/tests/test_publish_redshift.py +++ b/tests/test_publish_redshift.py @@ -96,7 +96,7 @@ def test_create_table(self, mock_session_helper, mock_execute): with mock_session_helper.db_session_scope() as mock_scope: publish_redshift.create_table(table_name, schema_name, columns, partitions, path, mock_session_helper) - assert mock_scope.execute.called_once_with(expected_sql) + assert mock_scope.execute.assert_called_once_with(expected_sql) # Test that the function is called with the table name without partitions @patch('s3parq.publish_redshift.SessionHelper') @@ -119,7 +119,7 @@ def test_create_table_without_partitions(self, mock_session_helper, mock_execute with mock_session_helper.db_session_scope() as mock_scope: publish_redshift.create_table(table_name, schema_name, columns, partitions, path, mock_session_helper) - assert mock_scope.execute.called_once_with(expected_sql) + assert mock_scope.execute.assert_called_once_with(expected_sql) def test_gets_proper_partitions(self): test_str = '/some/path/to/data/banana=33/orange=65/apple=abcd/xyz.parquet' @@ -198,7 +198,7 @@ def test_create_partitions(self, mock_session_helper, mock_execute): with mock_session_helper.db_session_scope() as mock_scope: publish_redshift.create_partitions( bucket, schema_name, table_name, filepath, mock_session_helper) - assert mock_scope.execute.called_once_with(expected_sql) + assert mock_scope.execute.assert_called_once_with(expected_sql) # Test to check that the passed in datatype maps correctly def test_datatype_mapper(self): @@ -237,7 +237,7 @@ def test_create_custom_table(self, mock_session_helper, mock_execute): with mock_session_helper.db_session_scope() as mock_scope: publish_redshift.create_custom_table(table_name, schema_name, partitions, path, custom_redshift_columns, mock_session_helper) - assert mock_scope.execute.called_once_with(expected_sql) + assert mock_scope.execute.assert_called_once_with(expected_sql) # Verify function call for custom create table, no partitions @patch('s3parq.publish_redshift.SessionHelper') @@ -263,4 +263,4 @@ def test_create_custom_table_without_partitions(self, mock_session_helper, mock_ with mock_session_helper.db_session_scope() as mock_scope: publish_redshift.create_custom_table(table_name, schema_name, partitions, path, custom_redshift_columns, mock_session_helper) - assert mock_scope.execute.called_once_with(expected_sql) + assert mock_scope.execute.assert_called_once_with(expected_sql)