Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7
FROM python:3.12

RUN mkdir /app
COPY . /app
Expand Down
21 changes: 11 additions & 10 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
pandas==0.24.2
pyarrow==0.13.0
boto3==1.9.177
s3fs==0.2.1
dfmock==0.0.14
moto==1.3.8
SQLAlchemy==1.3.5
pytest==5.0.0
psycopg2==2.8.3
numpy==1.19.5
pandas~=2.2.2
pyarrow~=17.0.0
boto3~=1.34.160
s3fs~=0.4.2
dfmock~=0.0.14
moto[server]~=5.0.12
SQLAlchemy~=2.0.32
psycopg2~=2.9.9
pytest~=8.3.2
numpy~=2.0.1
mock~=5.1.0
36 changes: 25 additions & 11 deletions s3parq/fetch_parq.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ast
from urllib import parse
import boto3
from collections import OrderedDict
import datetime
Expand Down Expand Up @@ -151,7 +152,7 @@ def get_max_partition_value(bucket: str, key: str, partition: str) -> any:

def fetch(bucket: str, key: str, filters: List[type(Filter)] = {}, parallel: bool = True, accept_not_s3parq: bool = True) -> pd.DataFrame:
""" S3 Parquet to Dataframe Fetcher.
This function handles the portion of work that will return a concatenated
This function handles the portion of work that will return a concatenated
dataframe based on the partition filters of the specified dataset.

Args:
Expand All @@ -164,7 +165,7 @@ def fetch(bucket: str, key: str, filters: List[type(Filter)] = {}, parallel: boo
The published partition to be applied to
- comparison (str):
Comparison function - one of: [ == , != , > , < , >= , <= ]
- values (List(any)):
- values (List(any)):
Values to compare to - must match partition data type
May not use multiple values with the '<', '>' comparisons
parallel (bool, Optional):
Expand Down Expand Up @@ -231,7 +232,7 @@ def fetch(bucket: str, key: str, filters: List[type(Filter)] = {}, parallel: boo


def fetch_diff(input_bucket: str, input_key: str, comparison_bucket: str, comparison_key: str, partition: str, reverse: bool = False, parallel: bool = True) -> pd.DataFrame:
""" Returns a dataframe of whats in the input dataset but not the comparison
""" Returns a dataframe of whats in the input dataset but not the comparison
dataset by the specified partition.

Args:
Expand All @@ -241,7 +242,7 @@ def fetch_diff(input_bucket: str, input_key: str, comparison_bucket: str, compar
comparison_key (str): The key to the dataset to compare against
partition (str): The partition in the dataset to compare the values of
reverse (bool, Optional): Determines if the operation should be inversed,
if True it will look for the values in comparison that are not
if True it will look for the values in comparison that are not
in the input (basically backwards). Defaults to False
parallel (bool, Optional):
Determines if multiprocessing should be used, defaults to True.
Expand Down Expand Up @@ -297,8 +298,12 @@ def convert_type(val: Any, dtype: str) -> Any:
elif dtype == 'float':
return float(val)
elif dtype == 'datetime':
return datetime.datetime.strptime(
val, '%Y-%m-%d %H:%M:%S')
format = '%Y-%m-%d %H:%M:%S'
try:
return datetime.datetime.strptime(
val, format)
except ValueError:
return _coerce_datetime_partition(val)
elif dtype == 'category':
return pd.Category(val)
elif dtype == 'bool' or dtype == 'boolean':
Expand Down Expand Up @@ -355,7 +360,7 @@ def get_all_files_list(bucket: str, key: str) -> list:


def _get_partitions_and_types(first_file_key: str, bucket: str) -> dict:
""" Fetch a list of all the partitions actually there and their
""" Fetch a list of all the partitions actually there and their
datatypes. List may be different than passed list if not being used
for filtering on.
NOTE: This is pulled from the metadata. It is assumed that this package
Expand Down Expand Up @@ -417,6 +422,15 @@ def _parse_partitions_and_values(file_paths: List[str], key: str) -> dict:

return parts

def _coerce_datetime_partition(partition_value:str) -> datetime.datetime:
"""datetimes in pyarrow are fickle now as hive partitions.
This attempts to force a resolution from them
"""
try:
return datetime.datetime.strptime(partition_value, "%Y-%m-%d %H:%M:%S")
except ValueError:
partition_value = parse.unquote(partition_value).split('.')[0]
return datetime.datetime.strptime(partition_value, "%Y-%m-%d %H:%M:%S")

def _get_partition_value_data_types(parsed_parts: dict, part_types: dict) -> dict:
""" Uses the partitions with their known types to parse them out
Expand All @@ -438,7 +452,7 @@ def _get_partition_value_data_types(parsed_parts: dict, part_types: dict) -> dic
parsed_parts[part] = set(map(float, values))
elif part_type == 'datetime':
parsed_parts[part] = set(
map(lambda s: datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S"), values))
map(lambda s: _coerce_datetime_partition(s), values))
elif (part_type == 'bool') or (part_type == 'boolean'):
parsed_parts[part] = set(map(bool, values))
else:
Expand Down Expand Up @@ -499,7 +513,7 @@ def construct_paths(matched_parts, previous_fil_keys: List[str]) -> None:


def _get_filtered_data(bucket: str, paths: List[str], partition_metadata: dict, parallel: bool = True) -> pd.DataFrame:
""" Gets the data based on the filtered object key list. Concatenates all
""" Gets the data based on the filtered object key list. Concatenates all
the separate parquet files.

Args:
Expand Down Expand Up @@ -581,7 +595,7 @@ def _repopulate_partitions(partition_string: str, partition_metadata: dict) -> t
if partition_metadata:
for key, val in partitions.items():
partitions[key] = convert_type(val, partition_metadata[key])

return partitions


Expand Down Expand Up @@ -616,7 +630,7 @@ def _validate_filter_rules(filters: List[type(Filter)]) -> None:

def _validate_matching_filter_data_type(part_types, filters: List[type(Filter)]) -> None:
""" Validate that the filters passed are matching to the partitions'
listed datatypes, otherwise throw a ValueError.
listed datatypes, otherwise throw a ValueError.
This includes validating comparisons too.

Args:
Expand Down
25 changes: 16 additions & 9 deletions s3parq/testing_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import pandas as pd
import numpy as np
from pandas.util.testing import assert_frame_equal
from pandas.testing import assert_frame_equal
import pyarrow as pa
import pyarrow.parquet as pq
import random
Expand Down Expand Up @@ -75,7 +75,7 @@ def sorted_dfs_equal_by_pandas_testing(df1: pd.DataFrame, df2: pd.DataFrame) ->
df1 = df1.sort_values(
by=df1.columns.tolist()).reset_index(drop=True)
# The setup on this part is a pain but it's the most specific check
assert_frame_equal(df2, df1)
assert_frame_equal(df2, df1, check_dtype=False)


# START SETUP BASED
Expand Down Expand Up @@ -145,6 +145,12 @@ def setup_grouped_dataframe(count: int = 100, columns: Dict = None):
}
df.columns = columns
df.generate_dataframe()
frame = df.dataframe
try:
# always seconds
frame['datetime_col'] = frame["datetime_col"].astype("datetime64[us]")
except KeyError:
pass
return df.dataframe


Expand All @@ -168,7 +174,7 @@ def setup_partitioned_parquet(
will be the default if not supplied
s3_client (boto3 S3 client, Optional): The started S3 client that boto
uses - NOTE: this should be made under a moto S3 mock!
If it is not provided, a session is crafted under moto.mock_s3
If it is not provided, a session is crafted under moto.mock_aws

Returns:
A tuple of the bucket and the published parquet file paths
Expand Down Expand Up @@ -197,10 +203,11 @@ def setup_partitioned_parquet(
table = pa.Table.from_pandas(dataframe)
pq.write_to_dataset(table,
root_path=str(tmp_dir),
coerce_timestamps='ms',
allow_truncated_timestamps=True,
partition_cols=list(
partition_data_types.keys())
partition_data_types.keys()),
)

parquet_paths = []

# traverse the local parquet tree
Expand Down Expand Up @@ -236,7 +243,7 @@ def setup_nons3parq_parquet(
string if not supplied
s3_client (boto3 S3 client, Optional): The started S3 client that boto
uses - NOTE: this should be made under a moto S3 mock!
If it is not provided, a session is crafted under moto.mock_s3
If it is not provided, a session is crafted under moto.mock_aws

Returns:
A tuple of the bucket and the published parquet file paths
Expand All @@ -251,7 +258,7 @@ def setup_nons3parq_parquet(
with ExitStack() as stack:
tmp_dir = stack.enter_context(tempfile.TemporaryDirectory())
if not s3_client:
stack.enter_context(moto.mock_s3())
stack.enter_context(moto.mock_aws())
s3_client = boto3.client('s3')

s3_client.create_bucket(Bucket=bucket)
Expand Down Expand Up @@ -298,8 +305,8 @@ def setup_custom_redshift_columns_and_dataframe():

def setup_custom_redshift_columns_and_dataframe_with_null():
""" Create a custom_redshift_columns dictionary that contains redshift column definitions and corresponding mock dataframe """
sample_data = {'colA': [1, 2, np.NaN], 'colB': ['DDD', None, 'FFF'],
'colC': [pd.Timestamp('20131213 11:59:59.999999999'), None, pd.Timestamp('20131213 11:59:59.999999999')], 'colE': [7.5, 3.4, np.NaN]}
sample_data = {'colA': [1, 2, np.nan], 'colB': ['DDD', None, 'FFF'],
'colC': [pd.Timestamp('20131213 11:59:59.999999999'), None, pd.Timestamp('20131213 11:59:59.999999999')], 'colE': [7.5, 3.4, np.nan]}

dataframe = pd.DataFrame(data=sample_data)

Expand Down
9 changes: 7 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7"
"Programming Language :: Python :: 3.8"
"Programming Language :: Python :: 3.9"
"Programming Language :: Python :: 3.10"
"Programming Language :: Python :: 3.11"
"Programming Language :: Python :: 3.12"
],
packages=find_packages(exclude=("tests",)),
include_package_data=True,
install_requires=[
"pandas>=0.24.2, <1",
"pandas>=2.2.2, <3",
"pyarrow>=0.13.0",
"boto3>=1.9",
"s3fs>=0.2",
"psycopg2==2.8.3",
"psycopg2>=2.8.3",
"SQLAlchemy>=1.3"
]
)
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
import pytest
import os
from moto.server import ThreadedMotoServer

@pytest.fixture
async def mock_boto():
server = ThreadedMotoServer(port=0)

server.start()
port = server._server.socket.getsockname()[1]
os.environ["AWS_ENDPOINT_URL"] = f"http://127.0.0.1:{port}"

yield

del os.environ["AWS_ENDPOINT_URL"]
server.stop()

def pytest_addoption(parser):
parser.addoption(
Expand Down
10 changes: 5 additions & 5 deletions tests/mock_helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import pytest
import boto3
from moto import mock_s3
from moto import mock_aws
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -13,18 +13,18 @@
import warnings


@mock_s3
@mock_aws
class MockHelper:
""" creates some helpful dataset stuff.
- dataframe is the frame created
- s3_bucket is the test bucket that has been populated
be sure to wrap this with moto @mock_s3 if you want to mock it
- s3_bucket is the test bucket that has been populated
be sure to wrap this with moto @mock_aws if you want to mock it
"""

def __init__(self, count=1000000, s3=False, files=False):
""" If s3 then will populate the s3 bucket with partitioned parquet. """

warnings.warn("MockHelper is a mess and will be removed in s3parq version 2.20",
warnings.warn("MockHelper is a mess and will be removed in s3parq version 2.20",
DeprecationWarning)

self._dataframe = self.setup_grouped_dataframe(count=count)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_fetch_parq.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

@contextlib.contextmanager
def get_s3_client():
with moto.mock_s3():
with moto.mock_aws():
yield boto3.client('s3')


Expand Down
6 changes: 3 additions & 3 deletions tests/test_fetch_parq_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

@contextlib.contextmanager
def get_s3_client():
with moto.mock_s3():
with moto.mock_aws():
yield boto3.client('s3')


Expand Down Expand Up @@ -386,8 +386,8 @@ def test_s3_partitioned_parquet_to_dataframe():

full_response = pd.DataFrame()
for path in parquet_paths:
full_response = full_response.append(fetch_parq._s3_parquet_to_dataframe(
bucket=bucket, key=path, partition_metadata=partition_types))
full_response = pd.concat([full_response, fetch_parq._s3_parquet_to_dataframe(
bucket=bucket, key=path, partition_metadata=partition_types)], ignore_index=True)

assert full_response.shape == df.shape
sorted_dfs_equal_by_pandas_testing(full_response, df)
4 changes: 2 additions & 2 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import dfmock
import moto
import pandas as pd
from pandas.util.testing import assert_frame_equal
from pandas.testing import assert_frame_equal
import pytest

from s3parq.fetch_parq import fetch
from s3parq.publish_parq import publish
from s3parq.testing_helper import df_equal_by_set, sorted_dfs_equal_by_pandas_testing


@moto.mock_s3
@moto.mock_aws
def test_end_to_end():
# make a sample DF for all the tests
df = dfmock.DFMock(count=10000)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_integration_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import dfmock
import moto
import pandas as pd
from pandas.util.testing import assert_frame_equal
from pandas.testing import assert_frame_equal
import pytest

from s3parq.fetch_parq import fetch
Expand All @@ -11,7 +11,7 @@


@pytest.mark.slow
@moto.mock_s3
@moto.mock_aws
def test_end_to_end():
df = dfmock.DFMock(count=100000)
df.columns = {"string_options": {"option_count": 4, "option_type": "string"},
Expand Down
4 changes: 2 additions & 2 deletions tests/test_publish_parq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import boto3
from dfmock import DFMock
from mock import patch
from moto import mock_s3
from moto import mock_aws
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -22,7 +22,7 @@

@contextlib.contextmanager
def get_s3_client():
with moto.mock_s3():
with moto.mock_aws():
yield boto3.client('s3')


Expand Down
Loading