An opinionated implementation of exclusively using airflow DockerOperators for all Operators.
from airflow_docker.operator import Operator
task = Operator(
image='some-image:latest',
...
)from airflow_docker.operator import Sensor
sensor = Sensor(
image='some-image:latest',
...
)Task Code
from airflow_docker_helper import client
client.sensor(True)Dag Task
from airflow_docker.operator import BranchOperator
branching_task = BranchOperator(
image='some-image:latest',
...
)Task Code
from airflow_docker_helper import client
client.branch_to_tasks(['task1', 'task2'])Dag Task
from airflow_docker.operator import ShortCircuitOperator
short_circuit = ShortCircuitOperator(
image='some-image:latest',
...
)Task Code
from airflow_docker_helper import client
client.short_circuit() # This task will short circuit if this function gets calledDag Task
from airflow_docker.operator import Operator
task = Operator(
image='some-image:latest',
provide_context=True,
...
)Task Code
from airflow_docker_helper import client
context = client.context()The following operator defaults can be set under the airflowdocker
namespace:
- force_pull (boolean true/false)
- auto_remove (boolean true/false)
- network_mode
For example, to set force_pull to False by default set the following
environment variable like so:
export AIRFLOW__AIRFLOWDOCKER__FORCE_PULL=falseThis package works as an airflow plugin as well. When installed and running airflow, dags can import like so
from airflow.{type, like "operators", "sensors"}.{name specificed inside the plugin class} import *i.e.
from airflow.operators.airflow_docker import OperatorWe also ship an airflowdocker/tester image to verify the integrity of
your DAG definitions before committing them.
One can run the tests against your own dags like so:
docker run -it -v "${pwd}/dags:/airflow/dags" airflowdocker/testeror else see the
airflow-docker-compose
project which ships with a test subcommand for precisely this purpose.