kptn is a meta orchestration framework for building R and Python data pipelines. It right-sizes tasks, tests changes, and streamlines development and operation.
Define your tasks and their dependencies in YAML and kptn will render a pipeline runnable locally or on AWS with one click.
- Testable tasks: Built-in change detection
- Right-sizing: CPU and memory requirements per task
- Branch-based development: Clone pipeline state across branches
- Update in-place: Tweak and run a subset of the pipeline
- Less boilerplate: Launch R scripts without writing any wrapper code
- Less vendor lock-in: Any orchestration framework could be rendered, not just Prefect
Here's a toy example of a pipeline called example with 3 tasks: A, B, C. We first define the graph of tasks, where A has no dependencies, B depends on A, and C depends on A and B.
graphs:
example:
tasks:
A:
B: A
C: [A, B]In the same YAML file, we can define the tasks (2 R tasks, 1 Python task):
tasks:
A:
file: "A/run.R"
outputs: ["A.csv"]
B:
file: "B/run.R"
outputs: ["B.csv"]
C:
file: C/run.py
outputs: ["C.csv"]The file fields define the location of the script to run. In the Python case, the file must contain a function with the same name as the task C. The outputs field are the files generated by the script.
There are two main configuration files: pyproject.toml and kptn.yaml
Your project is expected to use a pyproject.toml and contain a tool.kptn section.
| Field | Description | Required |
| flows_dir | The output directory where kptn will render your flow files to | Yes |
| py_tasks_dir | The directory (or list of directories) where Python task code exists | Yes |
| tasks_conf_path | The filepath to the kptn.yaml file defining your pipeline | Yes |
| docker_image | The name of the Docker image to build and push to Prefect | Yes |
| cache_namespace | Optional cache namespace shared across graphs/pipelines; defaults to the graph name | No |
Example:
[tool.kptn]
flows_dir = "py_src/flows"
py_tasks_dir = "py_src/tasks"
tasks_conf_path = "py_src/kptn.yaml"
docker_image = "nibrs-estimation-pipeline:latest"Create a file, kptn.yaml that contains definitions of the graphs of tasks and the tasks themselves.
| Field | Value Description | Required |
| graphs | A dictionary of graph IDs and graph objects | Yes |
| graphs.[id] | A dictionary representing a graph (nodes and edges) | Yes |
| graphs.[id].extends | Optional string or list of graph IDs (or objects with graph and optional per-task args) to inherit tasks from; the earliest occurrence of a task name wins (parents first, then child). If provided, `tasks` can be omitted to reuse a parent graph as-is. | No |
| graphs.[id].config | Graph-level config overrides merged on top of the root config when running this graph | No |
| graphs.[id].tasks | An ordered dictionary of task IDs (nodes) and their dependencies (edges) | Yes |
| graphs.[id].tasks.[task_id] | A list of dependency task IDs. If no dependencies, leave blank or use an empty list `[]`. | Yes |
| graphs.[id].tasks.[task_id].args | Static keyword args to supply to that task when run via this graph (overrides or supplements `tasks.[task_id].args`) | No |
| tasks | A dictionary of task names and task objects | Yes |
| tasks.[task_id] | A dictionary representing a task | Yes |
| tasks.[task_id].file | A string with the filepath to the Python or R script | Yes |
| tasks.[task_id].args | Static keyword args for Python tasks; values are passed as-is | No |
| tasks.[task_id].prefix_args | A string that will be inserted before the `Rscript` command-line call | No |
| tasks.[task_id].cli_args | A string that will be inserted at the end of the `Rscript` command-line call | No |
| tasks.[task_id].cache_result | A boolean, if `true`, the Python script return value will be saved in the cache database, DynamoDB. If this value is a large list (e.g. 50k items), it will be sharded across DynamoDB items for scalability. | No |
| tasks.[task_id].iterable_item | If `cache_result` is true and the result is a list, `iterable_item` is a string naming each item. The iterable item can also be a combination of values delimited by commas, e.g. US_STATE,ZIP_CODE | No |
| tasks.[task_id].map_over | A string that corresponds to an `iterable_item` in a dependency. Setting `map_over` will call this task for each item in the dependency result list. If this task is a Python task, the `iterable_item` will be passed as a function argument. If this task is an R task, the `iterable_item` will be passed as an environment variable to the R script. If the `iterable_item` is a comma-delimited combo, the values will be split and passed in separately. | No |
| tasks.[task_id].bundle_size | If `map_over` is set, an integer defining the number of subtasks sent to the Dask worker at one time. Default is 1. For a large number of subtasks, increasing this number can speedup the run time. | No |
| tasks.[task_id].group_size | If `map_over` is set, an integer defining the number of subtasks sent to the Dask scheduler at one time. Default is infinity. For a large number of subtasks, setting a max on this number can prevent the scheduler from getting overwhelmed. | No |
| tasks.[task_id].outputs | A list of files that the script outputs | No |
| tasks.[task_id].compute | A dictionary that can contain two fields, cpu and memory, corresponding to the Fargate task definition values | No |
| tasks.[task_id].tags | A list of strings, which will be used as the Prefect task's tags; useful for concurrency limits | No |
Example:
graphs:
my_graph:
tasks:
A:
B: [A]
tasks:
A:
file: true
cache_result: true
iterable_item: US_STATE
compute:
cpu: 256
memory: 512
B:
file: "B/run.R"
map_over: US_STATE
outputs:
- "${US_STATE}.csv"In this example, my_graph is the name of a graph (also known as a pipeline or a DAG). The graph object includes the tasks field, which is a dictionary of task names and a list of their corresponding dependencies (other tasks).
Task A will call the A() function in A.py and store the result in the cache database.
Task B will map over the result list, setting US_STATE=${US_STATE} as an environment variable for every call of the R script B/run.R
For example, if A returned the list ['NC', 'SC'], B/run.R would be called 2 times, once with US_STATE=NC and once with US_STATE=SC
Projects that render AWS Step Functions flows still need a way to run pipelines locally. The codegen command emits a vanilla Python runner next to the Step Functions JSON files whenever the project’s flow_type is stepfunctions.
Why not just use Prefect?
tldr: kptn simplifies pipeline configuration and improves base implementation
As we tried to scale with Prefect and ran into bugs and wondered if Airflow or Dagster might work better, but we'd already written our pipeline in formulaic Prefect code of "call task A, call task B". It was simple to pull out the logic into a YAML representation of a graph. This is a portable definition, allowing us to render to any orchestrator's code.
A major goal was to test our pipeline. If we change the code for task A, we need to be able to run task A by itself and check if its outputs changed compared to its previous outputs. Prefect 2 didn't support running tasks by themselves, and its caching wasn't designed for this use case. Rendering Prefect code allows us to render runnable pieces of the pipeline and use a cache accessible for snapshot testing.
Another major goal is scaling to handle a large number of mapped tasks. Mapped tasks create a single subtask for each input. Prefect has two task runners that allow running subtasks in parallel: Dask and Ray. In our testing with Dask, we've found that we can speedup runtime by batching subtasks into bundles for Dask workers, and prevent the scheduler from crashing by batching subtasks into groups that are run sequentially. kptn offers both of these features via its API. Caching is particularly important for mapped tasks, because mapped tasks can be very expensive to run and more prone to crash. In the event some subtasks fail, re-running will only re-run failed tasks.
Overall, kptn makes maintenance easier. Developing and testing new features, copying production state to a local environment for debugging, and running subsets of subtasks are use cases its designed for.
