diff --git a/packages/task-graph/src/common.ts b/packages/task-graph/src/common.ts index 36edb4b3..93d890ae 100644 --- a/packages/task-graph/src/common.ts +++ b/packages/task-graph/src/common.ts @@ -18,6 +18,8 @@ export * from "./task/TaskRegistry"; export * from "./task/JobQueueTask"; export * from "./task/TaskQueueRegistry"; export * from "./task/ArrayTask"; +export * from "./task/StreamingUtils"; +export * from "./task/StreamingTypes"; export * from "./task-graph/DataflowEvents"; export * from "./task-graph/Dataflow"; diff --git a/packages/task-graph/src/task-graph/Dataflow.ts b/packages/task-graph/src/task-graph/Dataflow.ts index e14e349d..eda3c654 100644 --- a/packages/task-graph/src/task-graph/Dataflow.ts +++ b/packages/task-graph/src/task-graph/Dataflow.ts @@ -53,12 +53,18 @@ export class Dataflow { public provenance: Provenance = {}; public status: TaskStatus = TaskStatus.PENDING; public error: TaskError | undefined; + /** Streaming value for incremental updates */ + public streamingValue: AsyncIterable | null = null; + /** Whether this dataflow is currently streaming */ + public isStreaming: boolean = false; public reset() { this.status = TaskStatus.PENDING; this.error = undefined; this.value = undefined; this.provenance = {}; + this.streamingValue = null; + this.isStreaming = false; this.emit("reset"); this.emit("status", this.status); } @@ -98,6 +104,64 @@ export class Dataflow { this.value = entireDataBlock[this.sourceTaskPortId]; } if (nodeProvenance) this.provenance = nodeProvenance; + // Clear streaming state when complete data is set + this.isStreaming = false; + this.streamingValue = null; + } + + /** + * Sets streaming port data for incremental updates + * @param chunk Partial data chunk from streaming task + * @param nodeProvenance Optional provenance information + */ + setStreamingPortData(chunk: any, nodeProvenance?: Provenance) { + if (!this.isStreaming) { + this.isStreaming = true; + this.status = TaskStatus.PROCESSING; + this.emit("status", this.status); + } + + if (this.sourceTaskPortId === DATAFLOW_ALL_PORTS) { + // Merge chunk into existing value or set as new + if (this.value === undefined) { + this.value = chunk; + } else if (typeof chunk === "object" && chunk !== null && !Array.isArray(chunk)) { + this.value = { ...this.value, ...chunk }; + } else { + // For non-object chunks, replace or append based on type + this.value = chunk; + } + } else if (this.sourceTaskPortId === DATAFLOW_ERROR_PORT) { + this.error = chunk; + } else { + // Merge chunk property into existing value + const chunkValue = chunk[this.sourceTaskPortId]; + if (chunkValue !== undefined) { + if (this.value === undefined) { + this.value = chunkValue; + } else if (typeof chunkValue === "object" && chunkValue !== null && !Array.isArray(chunkValue)) { + this.value = { ...this.value, ...chunkValue }; + } else { + this.value = chunkValue; + } + } + } + + if (nodeProvenance) { + this.provenance = { ...this.provenance, ...nodeProvenance }; + } + + this.emit("stream_chunk", chunk); + } + + /** + * Gets streaming port data as an async iterable + * @returns AsyncIterable of streaming chunks + */ + async *getStreamingPortData(): AsyncIterable { + if (this.streamingValue) { + yield* this.streamingValue; + } } getPortData(): TaskOutput { diff --git a/packages/task-graph/src/task-graph/DataflowEvents.ts b/packages/task-graph/src/task-graph/DataflowEvents.ts index 4c9283ca..fe0ce71b 100644 --- a/packages/task-graph/src/task-graph/DataflowEvents.ts +++ b/packages/task-graph/src/task-graph/DataflowEvents.ts @@ -34,6 +34,9 @@ export type DataflowEventListeners = { /** Fired when a dataflow status changes */ status: (status: TaskStatus) => void; + + /** Fired when a streaming chunk arrives */ + stream_chunk: (chunk: unknown) => void; }; /** Union type of all possible dataflow event names */ diff --git a/packages/task-graph/src/task-graph/TaskGraphRunner.ts b/packages/task-graph/src/task-graph/TaskGraphRunner.ts index cf7d5c81..042f13dc 100644 --- a/packages/task-graph/src/task-graph/TaskGraphRunner.ts +++ b/packages/task-graph/src/task-graph/TaskGraphRunner.ts @@ -19,6 +19,7 @@ import { Provenance, TaskInput, TaskOutput, TaskStatus } from "../task/TaskTypes import { DATAFLOW_ALL_PORTS } from "./Dataflow"; import { TaskGraph, TaskGraphRunConfig } from "./TaskGraph"; import { DependencyBasedScheduler, TopologicalScheduler } from "./TaskGraphScheduler"; +import { ensureTask } from "./Conversions"; export type GraphSingleTaskResult = { id: unknown; @@ -85,6 +86,8 @@ export class TaskGraphRunner { protected inProgressTasks: Map> = new Map(); protected inProgressFunctions: Map> = new Map(); protected failedTaskErrors: Map = new Map(); + /** Map of task IDs to their active streaming iterators */ + protected streamingTasks: Map> = new Map(); /** * Constructor for TaskGraphRunner @@ -137,17 +140,27 @@ export class TaskGraphRunner { // Only filter input for non-root tasks; root tasks get the full input const taskInput = isRootTask ? input : this.filterInputForTask(task, input); - const taskPromise = this.runTaskWithProvenance( - task, - taskInput, - config?.parentProvenance || {} - ); - this.inProgressTasks!.set(task.config.id, taskPromise); - const taskResult = await taskPromise; - - if (this.graph.getTargetDataflows(task.config.id).length === 0) { - // we save the results of all the leaves - results.push(taskResult as GraphSingleTaskResult); + // Check if task is streamable + if (task.isStreamable() && task.executeStream) { + await this.handleStreamingTask( + task, + taskInput, + config?.parentProvenance || {}, + results as GraphResultArray + ); + } else { + const taskPromise = this.runTaskWithProvenance( + task, + taskInput, + config?.parentProvenance || {} + ); + this.inProgressTasks!.set(task.config.id, taskPromise); + const taskResult = await taskPromise; + + if (this.graph.getTargetDataflows(task.config.id).length === 0) { + // we save the results of all the leaves + results.push(taskResult as GraphSingleTaskResult); + } } } catch (error) { this.failedTaskErrors.set(task.config.id, error as TaskError); @@ -155,6 +168,8 @@ export class TaskGraphRunner { this.processScheduler.onTaskCompleted(task.config.id); this.pushStatusFromNodeToEdges(this.graph, task); this.pushErrorFromNodeToEdges(this.graph, task); + // Clean up streaming iterator + this.streamingTasks.delete(task.config.id); } }; @@ -171,6 +186,18 @@ export class TaskGraphRunner { await Promise.allSettled(Array.from(this.inProgressTasks.values())); // Clean up stragglers to avoid unhandled promise rejections await Promise.allSettled(Array.from(this.inProgressFunctions.values())); + // Clean up streaming tasks + for (const [taskId, iterator] of this.streamingTasks.entries()) { + // Try to clean up the iterator if it has a return method + if (iterator.return) { + try { + await iterator.return(); + } catch { + // Ignore errors during cleanup + } + } + } + this.streamingTasks.clear(); if (this.failedTaskErrors.size > 0) { const latestError = this.failedTaskErrors.values().next().value!; @@ -405,6 +432,126 @@ export class TaskGraphRunner { } } + /** + * Pushes streaming output chunks from a task to its target dataflows + * @param node The task that produced the streaming chunk + * @param chunk The partial output chunk + * @param nodeProvenance The provenance input for the task + */ + protected async pushStreamingOutputFromNodeToEdges( + node: ITask, + chunk: Partial, + nodeProvenance?: Provenance + ) { + const dataflows = this.graph.getTargetDataflows(node.config.id); + for (const dataflow of dataflows) { + const compatibility = dataflow.semanticallyCompatible(this.graph, dataflow); + if (compatibility === "static" || compatibility === "runtime") { + dataflow.setStreamingPortData(chunk, nodeProvenance); + } + } + } + + /** + * Handles execution of a streaming task + * @param task The task to execute + * @param input The input to the task + * @param parentProvenance The provenance input for the task + * @param results Array to collect results + */ + protected async handleStreamingTask( + task: ITask, + input: TaskInput, + parentProvenance: Provenance, + results: GraphResultArray + ): Promise { + // Update provenance for the current task + const nodeProvenance = { + ...parentProvenance, + ...this.getInputProvenance(task), + ...task.getProvenance(), + }; + this.provenanceInput.set(task.config.id, nodeProvenance); + this.copyInputFromEdgesToNode(task); + + // Notify scheduler that streaming has started + if (this.processScheduler instanceof DependencyBasedScheduler) { + this.processScheduler.onTaskStreamingStart(task.config.id, task); + } + + // Create execution context + const context = { + signal: this.abortController!.signal, + nodeProvenance, + updateProgress: async (progress: number, message?: string, ...args: any[]) => + await this.handleProgress(task, progress, message, ...args), + own: (i: any) => { + const task = ensureTask(i, { isOwned: true }); + this.graph.addTask(task); + return i; + }, + onStreamChunk: async (chunk: Partial) => { + await this.pushStreamingOutputFromNodeToEdges(task, chunk, nodeProvenance); + // Notify scheduler of chunk + if (this.processScheduler instanceof DependencyBasedScheduler) { + this.processScheduler.onTaskStreamingChunk(task.config.id); + } + }, + }; + + // Execute streaming task + let finalOutput: TaskOutput | undefined; + const streamIterator = task.executeStream!(input, context); + this.streamingTasks.set(task.config.id, streamIterator); + + try { + // Iterate over stream chunks + for await (const chunk of streamIterator) { + if (this.abortController?.signal.aborted) { + break; + } + // Push chunk to dataflows + await this.pushStreamingOutputFromNodeToEdges(task, chunk, nodeProvenance); + // Notify scheduler of chunk + if (this.processScheduler instanceof DependencyBasedScheduler) { + this.processScheduler.onTaskStreamingChunk(task.config.id); + } + // Accumulate final output + if (finalOutput === undefined) { + finalOutput = chunk as TaskOutput; + } else { + // Merge chunks + finalOutput = { ...finalOutput, ...chunk }; + } + } + + // Set final output + if (finalOutput !== undefined) { + task.runOutputData = finalOutput; + await this.pushOutputFromNodeToEdges(task, finalOutput, nodeProvenance); + } + + // Add to results if leaf node + if (this.graph.getTargetDataflows(task.config.id).length === 0 && finalOutput !== undefined) { + results.push({ + id: task.config.id, + type: (task.constructor as any).runtype || (task.constructor as any).type, + data: finalOutput as T, + }); + } + } catch (error) { + // Clean up iterator on error + if (streamIterator.return) { + try { + await streamIterator.return(); + } catch { + // Ignore cleanup errors + } + } + throw error; + } + } + /** * Pushes the status of a task to its target edges * @param node The task that produced the status @@ -547,6 +694,7 @@ export class TaskGraphRunner { this.inProgressTasks.clear(); this.inProgressFunctions.clear(); this.failedTaskErrors.clear(); + this.streamingTasks.clear(); this.graph.emit("start"); } diff --git a/packages/task-graph/src/task-graph/TaskGraphScheduler.ts b/packages/task-graph/src/task-graph/TaskGraphScheduler.ts index aa30c000..b42b0287 100644 --- a/packages/task-graph/src/task-graph/TaskGraphScheduler.ts +++ b/packages/task-graph/src/task-graph/TaskGraphScheduler.ts @@ -68,6 +68,10 @@ export class DependencyBasedScheduler implements ITaskGraphScheduler { private completedTasks: Set; private pendingTasks: Set; private nextResolver: ((task: ITask | null) => void) | null = null; + /** Map of task IDs to tasks that are currently streaming */ + private streamingTasks: Map = new Map(); + /** Map of task IDs to whether they have produced at least one streaming chunk */ + private streamingTasksWithChunks: Set = new Set(); constructor(private dag: TaskGraph) { this.completedTasks = new Set(); @@ -79,7 +83,13 @@ export class DependencyBasedScheduler implements ITaskGraphScheduler { const dependencies = this.dag .getSourceDataflows(task.config.id) .map((dataflow) => dataflow.sourceTaskId); - return dependencies.every((dep) => this.completedTasks.has(dep)); + return dependencies.every((dep) => { + // Task is ready if dependency is completed OR streaming and has produced chunks + return ( + this.completedTasks.has(dep) || + (this.streamingTasks.has(dep) && this.streamingTasksWithChunks.has(dep)) + ); + }); } private async waitForNextTask(): Promise { @@ -112,10 +122,31 @@ export class DependencyBasedScheduler implements ITaskGraphScheduler { } } - onTaskCompleted(taskId: unknown): void { - this.completedTasks.add(taskId); + /** + * Notifies the scheduler that a task has started streaming + * @param taskId The ID of the task that started streaming + * @param task The task instance + */ + onTaskStreamingStart(taskId: unknown, task: ITask): void { + this.streamingTasks.set(taskId, task); + // Check if any pending tasks are now ready + this.checkForReadyTasks(); + } + /** + * Notifies the scheduler that a streaming task has produced a chunk + * @param taskId The ID of the task that produced the chunk + */ + onTaskStreamingChunk(taskId: unknown): void { + this.streamingTasksWithChunks.add(taskId); // Check if any pending tasks are now ready + this.checkForReadyTasks(); + } + + /** + * Checks for ready tasks and resolves the next resolver if found + */ + private checkForReadyTasks(): void { if (this.nextResolver) { const readyTask = Array.from(this.pendingTasks).find((task) => this.isTaskReady(task)); if (readyTask) { @@ -127,9 +158,21 @@ export class DependencyBasedScheduler implements ITaskGraphScheduler { } } + onTaskCompleted(taskId: unknown): void { + this.completedTasks.add(taskId); + // Remove from streaming tasks if it was streaming + this.streamingTasks.delete(taskId); + this.streamingTasksWithChunks.delete(taskId); + + // Check if any pending tasks are now ready + this.checkForReadyTasks(); + } + reset(): void { this.completedTasks.clear(); this.pendingTasks = new Set(this.dag.topologicallySortedNodes()); this.nextResolver = null; + this.streamingTasks.clear(); + this.streamingTasksWithChunks.clear(); } } diff --git a/packages/task-graph/src/task/ITask.ts b/packages/task-graph/src/task/ITask.ts index 85ca7da3..b79f5ba7 100644 --- a/packages/task-graph/src/task/ITask.ts +++ b/packages/task-graph/src/task/ITask.ts @@ -21,7 +21,14 @@ import type { import type { JsonTaskItem, TaskGraphItemJson } from "./TaskJSON"; import { TaskRunner } from "./TaskRunner"; import type { DataPortSchema } from "./TaskSchema"; -import type { Provenance, TaskConfig, TaskInput, TaskOutput, TaskStatus } from "./TaskTypes"; +import type { + Provenance, + StreamingMode, + TaskConfig, + TaskInput, + TaskOutput, + TaskStatus, +} from "./TaskTypes"; /** * Context for task execution @@ -31,6 +38,8 @@ export interface IExecuteContext { nodeProvenance: Provenance; updateProgress: (progress: number, message?: string, ...args: any[]) => Promise; own: (i: T) => T; + /** Optional callback for streaming chunks */ + onStreamChunk?: (chunk: Partial) => Promise; } export type IExecuteReactiveContext = Pick; @@ -60,6 +69,7 @@ export interface ITaskStaticProperties { readonly title?: string; readonly description?: string; readonly cacheable: boolean; + readonly streamable?: boolean | StreamingMode; readonly inputSchema: () => DataPortSchema; readonly outputSchema: () => DataPortSchema; } @@ -78,6 +88,14 @@ export interface ITaskExecution< output: Output, context: IExecuteReactiveContext ): Promise; + /** + * Optional streaming execution method + * Returns an async iterable iterator that yields partial outputs + */ + executeStream?( + input: Input, + context: IExecuteContext + ): AsyncIterableIterator>; } /** diff --git a/packages/task-graph/src/task/StreamingTypes.ts b/packages/task-graph/src/task/StreamingTypes.ts new file mode 100644 index 00000000..e9764ba0 --- /dev/null +++ b/packages/task-graph/src/task/StreamingTypes.ts @@ -0,0 +1,53 @@ +// ******************************************************************************* +// * PODLEY.AI: Your Agentic AI library * +// * * +// * Copyright Steven Roussey * +// * Licensed under the Apache License, Version 2.0 (the "License"); * +// ******************************************************************************* + +import type { ITask } from "./ITask"; +import { StreamingMode } from "./TaskTypes"; + +/** + * Type guards and validation functions for streaming tasks + */ + +/** + * Checks if a task is streamable + * @param task The task to check + * @returns true if the task supports streaming + */ +export function isStreamableTask(task: ITask): boolean { + return task.isStreamable(); +} + +/** + * Checks if an output value is a streaming output + * @param output The output value to check + * @returns true if the output is a streamable type + */ +export function isStreamingOutput(output: unknown): boolean { + if (typeof output === "string") { + return true; + } + if (Array.isArray(output)) { + return true; + } + if ( + output && + typeof output === "object" && + (Symbol.asyncIterator in output || "getReader" in output) + ) { + return true; + } + return false; +} + +/** + * Gets the streaming mode for a task + * @param task The task to check + * @returns The streaming mode + */ +export function getStreamingMode(task: ITask): StreamingMode { + return task.getStreamingMode(); +} diff --git a/packages/task-graph/src/task/StreamingUtils.ts b/packages/task-graph/src/task/StreamingUtils.ts new file mode 100644 index 00000000..c8f558d7 --- /dev/null +++ b/packages/task-graph/src/task/StreamingUtils.ts @@ -0,0 +1,90 @@ +// ******************************************************************************* +// * PODLEY.AI: Your Agentic AI library * +// * * +// * Copyright Steven Roussey * +// * Licensed under the Apache License, Version 2.0 (the "License"); * +// ******************************************************************************* + +/** + * Utility functions for streaming task outputs + */ + +/** + * Converts a string into a stream of chunks + * @param input The string to stream + * @param chunkSize The size of each chunk (default: 1 character) + * @returns An async iterable iterator that yields string chunks + */ +export async function* stringToStream( + input: string, + chunkSize: number = 1 +): AsyncIterableIterator { + for (let i = 0; i < input.length; i += chunkSize) { + yield input.slice(i, i + chunkSize); + } +} + +/** + * Converts an array into a stream of items + * @param input The array to stream + * @returns An async iterable iterator that yields array items + */ +export async function* arrayToStream(input: T[]): AsyncIterableIterator { + for (const item of input) { + yield item; + } +} + +/** + * Converts a progress callback into a stream + * This is a helper for tasks that use progress callbacks but want to expose streaming + * @param onProgress The progress callback function + * @returns An async iterable iterator that yields progress updates + */ +export async function* progressCallbackToStream( + onProgress: (progress: number, message?: string, details?: any) => void +): AsyncIterableIterator<{ progress: number; message?: string; details?: any }> { + // This is a placeholder - actual implementation would need to capture progress events + // For now, this serves as a type definition + yield { progress: 0 }; +} + +/** + * Merges multiple streams into a single stream + * @param streams Array of async iterable iterators to merge + * @returns An async iterable iterator that yields items from all streams + */ +export async function* mergeStreams( + streams: AsyncIterableIterator[] +): AsyncIterableIterator { + const iterators = streams.map((stream) => stream[Symbol.asyncIterator]()); + const nextPromises = iterators.map((it) => it.next()); + + while (nextPromises.length > 0) { + const results = await Promise.allSettled(nextPromises); + let hasMore = false; + + for (let i = 0; i < results.length; i++) { + const result = results[i]; + if (result.status === "fulfilled") { + const { value, done } = result.value; + if (!done) { + yield value; + nextPromises[i] = iterators[i].next(); + hasMore = true; + } else { + nextPromises[i] = Promise.resolve({ value: undefined, done: true }); + } + } else { + // Remove failed iterator + nextPromises.splice(i, 1); + iterators.splice(i, 1); + i--; + } + } + + if (!hasMore) { + break; + } + } +} diff --git a/packages/task-graph/src/task/Task.ts b/packages/task-graph/src/task/Task.ts index c58b5d15..06eca73b 100644 --- a/packages/task-graph/src/task/Task.ts +++ b/packages/task-graph/src/task/Task.ts @@ -21,6 +21,7 @@ import type { JsonTaskItem, TaskGraphItemJson } from "./TaskJSON"; import { TaskRunner } from "./TaskRunner"; import type { DataPortSchema } from "./TaskSchema"; import { + StreamingMode, TaskStatus, type Provenance, type TaskConfig, @@ -68,6 +69,11 @@ export class Task< */ public static cacheable: boolean = true; + /** + * Whether this task supports streaming + */ + public static streamable?: boolean | StreamingMode; + /** * Input schema for this task * Returns a JSONSchema7 compatible object schema @@ -120,6 +126,67 @@ export class Task< return output; } + /** + * Default implementation of executeStream that returns undefined. + * Subclasses should override this to provide streaming functionality. + * + * @param input The input to the task + * @param context The execution context + * @returns An async iterable iterator that yields partial outputs + */ + public async *executeStream( + input: Input, + context: IExecuteContext + ): AsyncIterableIterator> { + // Default implementation does nothing + // Subclasses should override this to provide streaming + return; + } + + /** + * Checks if this task is streamable + * @returns true if the task supports streaming + */ + public isStreamable(): boolean { + const staticStreamable = (this.constructor as typeof Task).streamable; + const configStreamable = this.config?.streamable; + + if (configStreamable !== undefined) { + return configStreamable === true || configStreamable !== StreamingMode.NONE; + } + + return staticStreamable === true || staticStreamable !== undefined; + } + + /** + * Gets the streaming mode for this task + * @returns The streaming mode, or StreamingMode.NONE if not streamable + */ + public getStreamingMode(): StreamingMode { + const staticStreamable = (this.constructor as typeof Task).streamable; + const configStreamable = this.config?.streamable; + + if (configStreamable !== undefined) { + if (configStreamable === true) { + return StreamingMode.CUSTOM; + } + if (typeof configStreamable === "string") { + return configStreamable; + } + } + + if (staticStreamable !== undefined) { + if (staticStreamable === true) { + return StreamingMode.CUSTOM; + } + if (typeof staticStreamable === "string") { + return staticStreamable; + } + } + + return StreamingMode.NONE; + } + // ======================================================================== // TaskRunner delegation - Executes and manages the task // ======================================================================== diff --git a/packages/task-graph/src/task/TaskRunner.ts b/packages/task-graph/src/task/TaskRunner.ts index 18e6397b..229c20ad 100644 --- a/packages/task-graph/src/task/TaskRunner.ts +++ b/packages/task-graph/src/task/TaskRunner.ts @@ -173,6 +173,11 @@ export class TaskRunner< * Protected method to execute a task by delegating back to the task itself. */ protected async executeTask(input: Input): Promise { + // Check if task supports streaming and has executeStream method + if (this.task.isStreamable() && this.task.executeStream) { + return await this.executeTaskStream(input); + } + const result = await this.task.execute(input, { signal: this.abortController!.signal, updateProgress: this.handleProgress.bind(this), @@ -182,6 +187,59 @@ export class TaskRunner< return await this.executeTaskReactive(input, result || ({} as Output)); } + /** + * Executes a task in streaming mode + * @param input The input to the task + * @returns The final accumulated output + */ + protected async executeTaskStream(input: Input): Promise { + if (!this.task.executeStream) { + // Fallback to regular execute if executeStream is not available + return await this.executeTask(input); + } + + const context = { + signal: this.abortController!.signal, + updateProgress: this.handleProgress.bind(this), + nodeProvenance: this.nodeProvenance, + own: this.own, + onStreamChunk: async (chunk: Partial) => { + // Update progress during streaming + await this.handleProgress(this.task.progress, "Streaming..."); + }, + }; + + let finalOutput: Output | undefined; + const streamIterator = this.task.executeStream(input, context); + + try { + // Iterate over stream chunks and accumulate + for await (const chunk of streamIterator) { + if (this.abortController?.signal.aborted) { + break; + } + // Merge chunks into final output + if (finalOutput === undefined) { + finalOutput = chunk as Output; + } else { + finalOutput = { ...finalOutput, ...chunk } as Output; + } + } + } catch (error) { + // Clean up iterator on error + if (streamIterator.return) { + try { + await streamIterator.return(); + } catch { + // Ignore cleanup errors + } + } + throw error; + } + + return await this.executeTaskReactive(input, finalOutput || ({} as Output)); + } + /** * Protected method for reactive execution delegation */ diff --git a/packages/task-graph/src/task/TaskTypes.ts b/packages/task-graph/src/task/TaskTypes.ts index f2b8ea1d..5c507c28 100644 --- a/packages/task-graph/src/task/TaskTypes.ts +++ b/packages/task-graph/src/task/TaskTypes.ts @@ -63,6 +63,35 @@ export type TaskTypeName = string; /** Type for task configuration */ export type TaskConfig = Partial; +// ======================================================================== +// Streaming Types +// ======================================================================== + +/** + * Streaming mode for tasks + */ +export const StreamingMode = { + /** Task does not support streaming */ + NONE: "NONE", + /** Task streams string data */ + STRING: "STRING", + /** Task streams array data */ + ARRAY: "ARRAY", + /** Task streams custom data via async iterable */ + CUSTOM: "CUSTOM", +} as const; + +export type StreamingMode = (typeof StreamingMode)[keyof typeof StreamingMode]; + +/** + * Type for streamable output values + */ +export type StreamableOutput = + | string + | Array + | AsyncIterable + | ReadableStream; + // ======================================================================== // Task Configuration Types // ======================================================================== @@ -86,6 +115,9 @@ export interface IConfig { /** Optional cacheable flag to use for this task, overriding the default static property */ cacheable?: boolean; + /** Optional streaming mode flag - true enables streaming, or specify a StreamingMode */ + streamable?: boolean | StreamingMode; + /** Optional user data to use for this task, not used by the task framework except it will be exported as part of the task JSON*/ extras?: DataPorts; }