2222
2323
2424@execution_interceptor
25- def _timeseries_dataset_chunk (task : Task , call_next : ForwardExecution , context : ExecutionContext ) -> None :
25+ def _timeseries_dataset_chunk (task : Task , call_next : ForwardExecution , context : ExecutionContext ) -> None : # noqa: C901
2626 if not isinstance (task , TimeseriesTask ):
2727 raise TypeError ("Task is not a timeseries task. Inherit from TimeseriesTask to mark it as such." )
2828
2929 chunk : TimeseriesDatasetChunk = task .timeseries_data # type: ignore[attr-defined]
3030
31- # let's get the collection object
32- dataset = context .runner_context .datasets_client . _dataset_by_id ( str ( chunk . dataset_id )) # type: ignore[attr-defined] # noqa: SLF001
33- collection = dataset . collection ( "unknown" ) # dummy collection, we will inject the right id below:
31+ # let's get a collection client
32+ datasets_client = context .runner_context .datasets_client
33+ dataset = datasets_client . _dataset_by_id ( str ( chunk . dataset_id )) # type: ignore[attr-defined] # noqa: SLF001
3434 # we already know the collection id, so we can skip the lookup (we don't know the name, but don't need it)
35- collection ._info = CollectionInfo (Collection (chunk .collection_id , "unknown" ), None , None ) # noqa: SLF001
35+ collection_info = CollectionInfo (Collection (chunk .collection_id , "unknown" ), None , None )
36+ collection = CollectionClient (dataset , collection_info )
3637
37- # leaf case: we are already executing a specific batch of datapoints fitting in the chunk size, so let's load them and process them
38+ # leaf case: we are already executing a specific batch of datapoints fitting in the chunk size, so let's load them
3839 if chunk .datapoint_interval :
3940 datapoint_interval = (chunk .datapoint_interval .start_id , chunk .datapoint_interval .end_id )
4041 # we already are a leaf task executing for a specific datapoint interval:
@@ -44,6 +45,9 @@ def _timeseries_dataset_chunk(task: Task, call_next: ForwardExecution, context:
4445 skip_data = False ,
4546 show_progress = False ,
4647 )
48+ if not datapoints :
49+ return # no datapoints in the interval -> we are done
50+
4751 for i in range (datapoints .sizes ["time" ]):
4852 datapoint = datapoints .isel (time = i )
4953 call_next (context , datapoint ) # type: ignore[call-arg]
@@ -88,7 +92,7 @@ def _timeseries_dataset_chunk(task: Task, call_next: ForwardExecution, context:
8892
8993 subtasks = [replace (task , timeseries_data = sub_chunk ) for sub_chunk in sub_chunks ] # type: ignore[misc]
9094 if len (subtasks ) > 0 :
91- context .submit_batch (subtasks )
95+ context .submit_subtasks (subtasks )
9296
9397 return
9498
0 commit comments