-
Notifications
You must be signed in to change notification settings - Fork 118
Description
In the _node_flow.py file, we have the following logic to handle the non-dask dataframe
output_df[oport] = [iout.compute() for iout in outputs_dly[oport]][0]We want to generalize it as the normal dask dataframe handles delayed objects.
That's a list of delayed objects. That's just another delayed collection. A dask-cudf or dask dataframe is just a collection of dataframes and itself is a delayed like object since you can call compute on it.
So the generalization would be to return the list of delayed objects that are not necessarily a dask dataframe. We would make a class such as "gQuantDaskData" to use as a port type. Then we can handle such a delayed collection as well. Based on ports type we can return something like:
output_df[oport] = gQuantDaskData(outputs_dly[oport])This idea would generalize our ability to handle dask distributed processing.
The npartitions is just the length of the list i.e. len(outputs_dly[oport])
Users could inherit from gQuantDaskData and set port types for their particular data. Something like:
class DistributedModel(gQuantDaskData):
pass # nothing in particular just indicates this port in/out type can be a distributed modelWe check for gQuantDaskData in delayed processing call, enforce for npartitions to match, and add that as a delayed input.
On output find the port type derived from gQuantDaskData and return. Above example:
output_df[oport] = DistributedModel(outputs_dly[oport])