Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 82 additions & 2 deletions earthmover/nodes/destination.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import dask.dataframe as dd
import jinja2
import os
import pandas as pd
Expand All @@ -18,8 +19,11 @@ class Destination(Node):
mode: str = None # Documents which class was chosen.
allowed_configs: Tuple[str] = ('debug', 'expect', 'show_progress', 'repartition', 'source',)

def __new__(cls, *args, **kwargs):
return object.__new__(FileDestination)
def __new__(cls, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover'):
if config.get('extension') == 'csv' or config.get('extension') == 'tsv':
return object.__new__(CsvDestination)
else:
return object.__new__(FileDestination)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -132,3 +136,79 @@ def render_row(self, row: pd.Series):
raise

return json_string


class CsvDestination(Destination):
"""

"""
mode: str = 'csv' # Documents which class was chosen.
allowed_configs: Tuple[str] = (
'debug', 'expect', 'show_progress', 'repartition', 'source',
'extension', 'header', 'separator', 'limit', 'keep_columns'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to reiterate my opinion that limit and keep_columns should not be part of destination configs. These are data transformations which should be done separately. We already have a keep_columns transformation operation, adding a limit operation would be simple (and, for performance reasons, should be done as far upstream as possible, not at the final destination).

)


def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.header = self.error_handler.assert_get_key(self.config, 'header', dtype=bool, required=False, default=True)
self.separator = self.error_handler.assert_get_key(self.config, 'separator', dtype=str, required=False, default=",")
self.limit = self.error_handler.assert_get_key(self.config, 'limit', dtype=int, required=False, default=None)
self.extension = self.error_handler.assert_get_key(self.config, 'extension', dtype=str, required=False, default="csv")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is technically required, since it has to be populated to initialize the CSV Destination.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a different approach here, adding another kind property which is used to select the destination Class. I envision that kind could have values like

file.jsonl
file.csv
file.tsv
file.parquet
file.xml
...
database.mysql
database.postgres
database.snowflake
...

This does perhaps make extension superfluous, except that if someone insists on using .ndjson for JSONL, or even really wants to put TSV data in a file with a .xml extension, I guess that should be possible. Maybe eventually extension becomes optional, with a default value of "infer."

Eventually we may add an (optional) location indicating where to materialize files or execute database.* SQL, with values like

local # (default)
s3://bucket_name/path/to/dir/
sftp://user:pass@domain.com/path/to/dir/
postgres://user:pass@domain.com:123/database_name?currentSchema=schema_name # (a SQLalchemy connection string)
snowflake://username:password@account_id/db_name/schema_name?warehouse=wh_name&role=role_name # (a SQLalchemy connection string)
...

(earthmover could parse the location and figure out what connector/library to use internally.)

extension also doesn't really make sense for database.* destination kinds, unless location is a file system in which case it would probably be .sql.

Eventually we may add an optional mode: overwrite # or append. For file materialization, this is self-explanatory. For database.* materialization, this could trigger a TRUNCATE statement before INSERTS begin.

One other relatively unrelated comment, if/when we support writing to databases, the order in which we process destinations will become important (if there are primary/foreign key references in the data). Currently there's no way in earthmover to control the order in which destinations are processed, we'd have to figure out how to handle that... maybe (like dbt does) an optional depends_on property in each destination.

self.keep_columns = self.error_handler.assert_get_key(self.config, 'keep_columns', required=False, default=None)

self.file = os.path.join(
self.earthmover.state_configs['output_dir'],
f"{self.name}.{self.extension}"
)

def execute(self, **kwargs):
"""

:return:
"""
super().execute(**kwargs)

self.data = self.upstream_sources[self.source].data

# Apply limit to dataframe if specified.
if self.limit:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if raising an error is the right choice here. If the user specifies more rows than exist in the dataframe, we should just return all rows.

if self.limit > len(self.data):
self.error_handler.throw(
f"Limit value exceeds the number of rows in the data"
)
raise

self.data = dd.from_pandas(self.data.head(n=self.limit), npartitions=1)

# Verify the output directory exists.
os.makedirs(os.path.dirname(self.file), exist_ok=True)
self.logger.info(f"Directory created: {os.path.dirname(self.file)}")

# Subset dataframe columns if specified
try:
if self.keep_columns:
self.data = self.data[self.keep_columns]

except KeyError as e:
self.error_handler.throw(
f"Error occurred while subsetting the data: {e.args[0]}"
)
raise

# Change separator to tab if extension is tsv
if self.extension == 'tsv':
self.separator = '\t'

try:
self.data.to_csv(
filename=self.file, single_file=True, index=False,
sep=self.separator, header=self.header
)
self.logger.info(f"Output `{self.file}` written")

except Exception as err:
self.error_handler.throw(
f"Error writing data to {self.extension} file: ({err})"
)
raise
12 changes: 11 additions & 1 deletion example_projects/01_simple/earthmover.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ destinations:
source: $sources.testing_source
template: ./templates/studentSchoolAttendanceEvent.jsont
extension: jsonl
linearize: True
linearize: True
attendance:
source: $sources.attendance
extension: tsv
debug: True
header: True
limit: 5
keep_columns:
- day
- student_id