Using Operators

An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs.

See the Operators Concepts documentation and the Operators API Reference for more information.

BashOperator

Use the BashOperator to execute commands in a Bash shell.

run_this = BashOperator(
    task_id='run_after_loop', bash_command='echo 1', dag=dag)

Templating

You can use Jinja templates to parameterize the bash_command argument.

task = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag)

Troubleshooting

Jinja template not found

Add a space after the script name when directly calling a Bash script with the bash_command argument. This is because Airflow tries to apply a Jinja template to it, which will fail.

t2 = BashOperator(
    task_id='bash_example',

    # This fails with `Jinja template not found` error
    # bash_command="/home/batcher/test.sh",

    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
    dag=dag)

PythonOperator

Use the PythonOperator to execute Python callables.

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

Passing in arguments

Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)

    task.set_upstream(run_this)

Templating

When you set the provide_context argument to True, Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

The templates_dict argument is templated, so each value in the dictionary is evaluated as a Jinja template.

Google Cloud Platform Operators

GoogleCloudStorageToBigQueryOperator

Use the GoogleCloudStorageToBigQueryOperator to execute a BigQuery load job.

load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq_example',
    bucket='cloud-samples-data',
    source_objects=['bigquery/us-states/us-states.csv'],
    destination_project_dataset_table='airflow_test.gcs_to_bq_table',
    schema_fields=[
        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
    ],
    write_disposition='WRITE_TRUNCATE',
    dag=dag)