Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ A lot of pre-filtering involves trimming down your dataset based on the values a
key,
partition)

## To Test
```
bash dev_env --build
pytest tests/
```

## Redshift Spectrum
Dataframes published to S3 can optionally be queried in AWS Redshift Spectrum. To enable this functionality, you must have an external database configured in Redshift. See the [AWS docs](https://docs.aws.amazon.com/redshift/latest/dg/c-using-spectrum.html) for help setting up a database in Redshift. To enable this functionality in S3parq, simply pass a dictionary of configurations to `publish()` via the redshift_params argument.

Expand Down
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pandas==0.24.2
pyarrow==0.13.0
boto3==1.9.177
s3fs==0.2.1
pandas==1.2.4
pyarrow==4.0.0
boto3==1.17.58
s3fs==0.4.2
dfmock==0.0.14
moto==1.3.8
psycopg2==2.8.3
Expand Down
7 changes: 4 additions & 3 deletions s3parq/publish_parq.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def validate_redshift_params(redshift_params: dict) -> dict:
- port (str): Redshift Spectrum port to use
- db_name (str): Redshift Spectrum database name to use
- ec2_user (str): If on ec2, the user that should be used
- read_access_user (str): Name of user getting READ access on a schema

Returns:
The given redshift_params, with table and schema names lowercase
Expand All @@ -120,7 +121,7 @@ def validate_redshift_params(redshift_params: dict) -> dict:
ValueError: If redshift_params is missing any of the above attributes
"""
expected_params = ["schema_name", "table_name", "iam_role",
"region", "cluster_id", "host", "port", "db_name", "ec2_user"]
"region", "cluster_id", "host", "port", "db_name", "ec2_user", "read_access_user"]
logger.debug("Checking redshift params are correctly formatted")
if len(redshift_params) != len(expected_params):
params_length_message = f"Expected parameters: {len(expected_params)}. Received: {len(redshift_params)}"
Expand Down Expand Up @@ -513,7 +514,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram

session_helper.configure_session_helper()
publish_redshift.create_schema(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, redshift_params['read_access_user'])
logger.debug(
f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...")

Expand Down Expand Up @@ -618,7 +619,7 @@ def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.D

session_helper.configure_session_helper()
publish_redshift.create_schema(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, redshift_params['read_access_user'])
logger.debug(
f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...")

Expand Down
9 changes: 8 additions & 1 deletion s3parq/publish_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def _datatype_mapper(columns: dict) -> dict:
return f"({sql_statement[:-2]})" # Slice off the last space and comma


def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper) -> None:
def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper, read_access_user=None) -> None:
""" Creates a schema in AWS redshift using a given iam_role

Args:
Expand All @@ -198,6 +198,13 @@ def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper:
logger.info(f'Running query to create schema: {new_schema_query}')
scope.execute(new_schema_query)

if read_access_user:
grant_access_query = f"GRANT USAGE ON SCHEMA {schema_name} TO {read_access_user};\
GRANT SELECT ON ALL TABLES IN SCHEMA {schema_name} TO {read_access_user};\
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema_name} GRANT SELECT ON TABLES TO {read_access_user};"
logger.info(f'Running query to grant access to schema: {grant_access_query}')
scope.execute(grant_access_query)


def create_table(table_name: str, schema_name: str, columns: dict, partitions: dict, path: str, session_helper: SessionHelper) -> None:
""" Creates a table in AWS redshift. The table will be named
Expand Down
7 changes: 4 additions & 3 deletions tests/test_publish_parq.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def setup_redshift_params(self):
'host': 'hamburger_host',
'port': '9999',
'db_name': 'hamburger_db',
'ec2_user': 'hamburger_aws'
'ec2_user': 'hamburger_aws',
'read_access_user': 'some_read_only_user'
}

return redshift_params
Expand Down Expand Up @@ -285,7 +286,7 @@ def test_schema_publish(self, mock_session_helper, mock_create_schema):
dataframe=dataframe, partitions=partitions, redshift_params=redshift_params)

mock_create_schema.assert_called_once_with(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, redshift_params['read_access_user'])

@patch('s3parq.publish_redshift.create_table')
@patch('s3parq.publish_parq.SessionHelper')
Expand Down Expand Up @@ -375,7 +376,7 @@ def test_custom_publish_schema_publish(self, mock_session_helper, mock_create_sc
custom_redshift_columns=custom_redshift_columns)

mock_create_schema.assert_called_once_with(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, redshift_params['read_access_user'])

@patch('s3parq.publish_redshift.create_custom_table')
@patch('s3parq.publish_parq.SessionHelper')
Expand Down