Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions earthmover/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
import pandas as pd

# only use upgraded dask/pandas config on later versions of python
import sys
if sys.version_info.minor >= 10:

# September 2024 - for now we need to do this in order to turn off the Dask
# query optimizer - see https://blog.dask.org/2023/08/25/dask-expr-introduction
# For reasons unknown, it doesn't yet work with Earthmover. A future Dask
# version may force us to use the query optimizer, but hopefully by then,
# the bugs that emerge when we use it with Earthmover will have been fixed.
import dask
dask.config.set({'dataframe.query-planning': False})

# performance enhancements
dask.config.set({"dataframe.convert-string": True})

import pandas as pd
pd.options.mode.copy_on_write = True
pd.options.mode.string_storage = "pyarrow"
1 change: 0 additions & 1 deletion earthmover/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,5 @@ def main(argv=None):
logger.exception(f"unknown command '{args.command}', use -h flag for help")
raise


if __name__ == "__main__":
sys.exit(main())
75 changes: 70 additions & 5 deletions earthmover/earthmover.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ def __init__(self,

# Prepare the output directory for destinations.
self.state_configs['output_dir'] = os.path.expanduser(self.state_configs['output_dir'])

# Set the temporary directory in cases of disk-spillage.
dask.config.set({'temporary_directory': self.state_configs['tmp_dir']})


# Set a directory for installing packages.
self.packages_dir = os.path.join(os.getcwd(), 'packages')

Expand Down Expand Up @@ -154,6 +151,59 @@ def inject_cli_overrides(self, configs, prefix=None):
configs.set_path(key.lstrip(prefix), value)
return configs

def config_dask(self):
# Here we set some default dask configs that are needed for most earthmover runs.
# A power user can certainly override them in earthmover.yml's `config.dask`` section.
# * September 2024 - for now we need to do this in order to turn off the Dask
# query optimizer - see https://blog.dask.org/2023/08/25/dask-expr-introduction
# For reasons unknown, it doesn't yet work with Earthmover. A future Dask
# version may force us to use the query optimizer, but hopefully by then,
# the bugs that emerge when we use it with Earthmover will have been fixed.
# * convert-string converts string-like data to pyarrow strings to improve performance
# see https://docs.dask.org/en/latest/configuration.html#dataframe.convert-string
default_dask_configs = {
"dataframe.query-planning": False,
"dataframe.convert-string": True
}

# Here we set up dask configs, and (perhaps) dask distributed configs.
# Dask configs are nested dicts, but must be passed to dask as a flattened dict... so
# ````yaml
# temporary_directory: /tmp
# dataframe:
# backend: pandas
# ```
# should be passed to dask as
# > dask.config.set({"temporary_directory": "/tmp", "dataframe.backend": "pandas"})
#
# The calls below to `util.flatten_dict()`` convert between a nested dict, like what
# comes from the earthmover YAML config, to a flattened dict, like what Dask needs.
dask_configs = default_dask_configs.copy()
user_dask_configs = self.user_configs.get("config",{}).get("dask", {})
# possible configs are here: https://github.com/dask/dask/blob/main/dask/dask.yaml
# and (for distributed): https://github.com/dask/distributed/blob/main/distributed/distributed.yaml
temp_dir = self.user_configs.get("config",{}).get("temp_dir", False)
if temp_dir:
user_dask_configs["temporary-directory"] = temp_dir
if user_dask_configs:
dask_configs.update(util.flatten_dict(user_dask_configs.to_dict()))
dask.config.set(dask_configs)

# distributed settings:
self.distributed = self.user_configs.get("config",{}).get("dask",{}).get("distributed",False)
if self.distributed:
self.dask_cluster_kwargs = self.user_configs.get("config",{}).get("dask_cluster_kwargs",False)
# possible configs are here: https://distributed.dask.org/en/stable/api.html?highlight=localcluster#distributed.LocalCluster
# should perhaps default to...
# {
# "processes": True,
# "n_workers": os.cpu_count() - 1,
# "threads_per_worker": 1,
# "memory_limit": str((psutil.virtual_memory().total / (1024**2)) / (os.cpu_count() - 1)) + "MB"
# }
# (but unfortunately this would introduce a dependency on psutil...)


def compile(self, to_disk: bool = False):
"""
Parse optional packages, iterate the node configs, compile each Node, and build the graph.
Expand All @@ -170,6 +220,10 @@ def compile(self, to_disk: bool = False):
if to_disk:
self.user_configs.to_disk(self.compiled_yaml_file)
self.user_configs = self.inject_cli_overrides(self.user_configs)
self.package_graph = None # not needed anymore, and breaks dask.distributed (not serializable)

# Configure Dask:
self.config_dask()

### Compile the nodes and add to the graph type-by-type.
self.sources = self.compile_node_configs(
Expand Down Expand Up @@ -252,6 +306,16 @@ def execute(self, graph: Graph):
Iterate subgraphs in `Earthmover.graph` and execute each Node in order.
:return:
"""
if self.distributed:
from dask.distributed import LocalCluster, Client
self.logger.info(f"running with dask distributed")
if not self.dask_cluster_kwargs:
self.logger.error("`config.dask_cluster_kwargs` is required in `earthmover.yml` when `config.dask_distributed` is specified; see documentation for details and example configuration")
cluster = LocalCluster(**(self.dask_cluster_kwargs.to_dict()))
self.logger.info(f"view the dask profiling dashboard at {cluster.dashboard_link}")
dask_client = Client(cluster)
dask_client.get_versions(check=True)

if not os.path.isdir(self.state_configs['output_dir']):
self.logger.info(
f"creating output directory {self.state_configs['output_dir']}"
Expand Down Expand Up @@ -391,7 +455,8 @@ def generate(self, selector: str = "*"):


### Draw the graph again, this time add metadata about rows/cols/size at each node
if self.state_configs['show_graph']:
if self.state_configs['show_graph'] and not self.distributed:
# (don't do this if we're running distributed, as that would cause an expensive recompute)
self.logger.info("saving dataflow graph image to `graph.png` and `graph.svg`")

# Compute all row number values at once for performance, then update the nodes.
Expand Down
137 changes: 89 additions & 48 deletions earthmover/nodes/destination.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import pandas as pd
import dask
import re
import csv
import warnings
from functools import partial

from earthmover.nodes.node import Node
from earthmover import util
Expand Down Expand Up @@ -91,8 +94,8 @@ def execute(self, **kwargs):
# Replace multiple spaces with a single space to flatten templates.
if self.linearize:
template_string = self.EXP.sub(" ", template_string)

self.jinja_template = util.build_jinja_template(template_string, macros=self.earthmover.macros)
else:
template_string = template_string.strip("\r\n") + "\n"

except OSError as err:
self.error_handler.throw(
Expand All @@ -110,7 +113,9 @@ def execute(self, **kwargs):
# (meta=... below is how we prevent dask warnings that it can't infer the output data type)
self.data = (
self.upstream_sources[self.source].data
.map_partitions(lambda x: x.apply(self.render_row, jinja_template=self.jinja_template, axis=1), meta=pd.Series('str'))
.fillna("") # needed to prevent "None" from entering final values
.map_partitions(partial(self.apply_render_row, template_string, self.render_row), meta=pd.Series('str'))
.reset_index(drop=True)
)

# Repartition before writing, if specified.
Expand All @@ -119,61 +124,97 @@ def execute(self, **kwargs):
# Verify the output directory exists.
os.makedirs(os.path.dirname(self.file), exist_ok=True)

# Write the optional header, each line, and the optional footer.
with open(self.file, 'w+', encoding='utf-8') as fp:

# only load the first row if header/footer contain Jinja that might need it:
if (
(self.header and util.contains_jinja(self.header))
or (self.footer and util.contains_jinja(self.footer))
):
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Insufficient elements for `head`")
# (use `npartitions=-1` because the first N partitions could be empty)
first_row = self.upstream_sources[self.source].data.head(1, npartitions=-1).reset_index(drop=True).iloc[0]

except IndexError: # If no rows are present, build a representation of the row with empty values
first_row = {col: "" for col in self.upstream_sources[self.source].data.columns}
first_row['__row_data__'] = first_row

if self.header and util.contains_jinja(self.header):
jinja_template = util.build_jinja_template(self.header, macros=self.earthmover.macros)
rendered_template = self.render_row(first_row, jinja_template=jinja_template)
fp.write(rendered_template)
elif self.header: # no jinja
fp.write(self.header)

for partition in self.data.partitions:
fp.writelines(partition.compute())
partition = None # Remove partition from memory immediately after write.

if self.footer and util.contains_jinja(self.footer):
jinja_template = util.build_jinja_template(self.footer, macros=self.earthmover.macros)
rendered_template = self.render_row(first_row, jinja_template)
fp.write(rendered_template)
elif self.footer: # no jinja
fp.write(self.footer)
if os.path.exists(self.file): os.remove(self.file) # remove file (if exists)
open(self.file, 'a').close() # touch file, so it exists

# only load the first row if header/footer contain Jinja that might need it:
if (
(self.header and util.contains_jinja(self.header))
or (self.footer and util.contains_jinja(self.footer))
):
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Insufficient elements for `head`")
# (use `npartitions=-1` because the first N partitions could be empty)
# (use self.upstream_sources because self.data is a single column, the rendered Jinja template)
first_row = self.upstream_sources[self.source].data.head(1, npartitions=-1).reset_index(drop=True).iloc[0]

except IndexError: # If no rows are present, build a representation of the row with empty values
first_row = {col: "" for col in self.upstream_sources[self.source].data.columns}
# first_row['__row_data__'] = first_row
# first_row = pd.Series(first_row)
# first_row_data = util.add_dunder_row_data(first_row)

# Write the optional header, each line
if self.header:
with open(self.file, 'a', encoding='utf-8') as fp:
if self.header and util.contains_jinja(self.header):
header_template = util.build_jinja_template(self.earthmover.macros + self.header)
rendered_template = self.render_row(first_row, template=header_template, template_string=self.header, dunder_row_data=True)
fp.write(rendered_template.strip("\r\n") + "\n")
elif self.header: # no jinja
fp.write(self.header.strip("\r\n") + "\n")

# Append data rows to file:
# to_csv() - which is most efficient - unfortunately only works if `linearize: True`;
# otherwise, we get an error about escapechar being required (since the non-linearized
# data might contain newline chars)
if self.linearize:
self.data.to_csv(
self.file,
single_file=True,
index=False,
header=False,
encoding='utf-8',
mode='a',
quoting=csv.QUOTE_NONE,
doublequote=False,
na_rep="",
sep="~",
escapechar='')
else:
with open(self.file, 'a', encoding='utf-8') as fp:
for partition in self.data.partitions:
fp.writelines([r[0] for r in dask.compute(partition)])
partition = None

# Write the optional header, each line
if self.footer:
with open(self.file, 'a', encoding='utf-8') as fp:
if self.footer and util.contains_jinja(self.footer):
footer_template = util.build_jinja_template(self.earthmover.macros + self.footer)
rendered_template = self.render_row(first_row, template_bytecode_file=footer_template, template_string=self.footer, dunder_row_data=True)
fp.write(rendered_template.strip("\r\n") + "\n")
elif self.footer: # no jinja
fp.write(self.footer.strip("\r\n") + "\n")

self.logger.debug(f"output `{self.file}` written")
self.size = os.path.getsize(self.file)

def render_row(self, row: pd.Series, jinja_template):
row_data = row if isinstance(row, dict) else row.to_dict()
row_data = {
field: self.cast_output_dtype(value)
for field, value in row_data.items()
}
row_data["__row_data__"] = row_data

@staticmethod
def apply_render_row(template_string, render_row, x):
template = util.build_jinja_template(template_string)
dunder_row_data = '__row_data__' in util.get_jinja_template_params(template_string)
return x.apply(
render_row,
template = template,
template_string = template_string,
dunder_row_data = dunder_row_data,
axis=1)

def render_row(self, row: pd.Series, template, template_string, dunder_row_data=False):
try:
json_string = jinja_template.render(row_data) + "\n"
json_string = util.render_jinja_template(
row,
template,
template_string,
dunder_row_data)

except Exception as err:
print(err)
self.error_handler.throw(
f"error rendering Jinja template in `template` file {self.template} ({err})"
)
raise

return json_string
25 changes: 21 additions & 4 deletions earthmover/nodes/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ class Node:
type: str = None
allowed_configs: Tuple[str] = ('debug', 'expect', 'require_rows', 'show_progress', 'repartition')

def __getnewargs__(self):
return self.name, self.config, self.earthmover

def __dask_tokenize__(self):
from dask.base import normalize_token
return normalize_token((type(self), self.type))

def __init__(self, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover'):
self.name: str = name
self.config: 'YamlMapping' = config
Expand Down Expand Up @@ -54,6 +61,11 @@ def __init__(self, name: str, config: 'YamlMapping', *, earthmover: 'Earthmover'
# Internal Dask configs
self.partition_size: Union[str, int] = self.config.get('repartition')

# Some nodes automatically repartition, per Dask's recommendation.
# (see https://docs.dask.org/en/stable/dataframe-best-practices.html#repartition-to-reduce-overhead)
# This is set here so it would be easy to change in the future if needed.
self.target_partition_size = "100MB"

# Optional variables for displaying progress and diagnostics.
self.show_progress: bool = self.config.get('show_progress', self.earthmover.state_configs["show_progress"])
self.progress_bar: ProgressBar = ProgressBar(minimum=10, dt=5.0) # Always instantiate, but only use if `show_progress is True`.
Expand Down Expand Up @@ -153,7 +165,10 @@ def display_head(self, nrows: int = 5):
warnings.filterwarnings("ignore", message="Insufficient elements for `head`")

# Complete all computes at once to reduce duplicate computation.
self.num_rows, data_head = dask.compute([self.num_rows, self.data.head(nrows)])[0]
# dask.compute(self.data)
# self.num_rows, data_head = dask.compute([self.num_rows, self.data.head(nrows)])[0]
self.num_rows = self.num_rows.compute()
data_head = dask.compute(self.data.head(nrows))[0]

self.logger.info(f"Node {self.name}: {int(self.num_rows)} rows; {self.num_cols} columns")
with pd.option_context('display.max_columns', None, 'display.width', None):
Expand All @@ -172,13 +187,15 @@ def check_expectations(self, expectations: List[str]):
result = self.data.copy()

for expectation in expectations:
template = jinja2.Template("{{" + expectation + "}}")
template_string = "{{" + expectation + "}}"
# template = util.build_jinja_template(template_string=template_string, macros="")

result[expectation_result_col] = result.apply(
util.render_jinja_template, axis=1,
meta=pd.Series(dtype='str', name=expectation_result_col),
template=template,
template_str="{{" + expectation + "}}",
jinja_environment=self.earthmover.jinja_environment,
template_string=template_string,
macros="",
error_handler = self.error_handler
)

Expand Down
Loading