An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs.
See the Operators Concepts documentation and the Operators API Reference for more information.
Use the BashOperator
to execute
commands in a Bash shell.
run_this = BashOperator(
task_id='run_after_loop', bash_command='echo 1', dag=dag)
You can use Jinja templates to parameterize the
bash_command
argument.
task = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag)
Add a space after the script name when directly calling a Bash script with
the bash_command
argument. This is because Airflow tries to apply a Jinja
template to it, which will fail.
t2 = BashOperator(
task_id='bash_example',
# This fails with `Jinja template not found` error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command="/home/batcher/test.sh ",
dag=dag)
Use the PythonOperator
to execute
Python callables.
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag)
Use the op_args
and op_kwargs
arguments to pass additional arguments
to the Python callable.
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag)
task.set_upstream(run_this)
When you set the provide_context
argument to True
, Airflow passes in
an additional set of keyword arguments: one for each of the Jinja
template variables and a templates_dict
argument.
The templates_dict
argument is templated, so each value in the dictionary
is evaluated as a Jinja template.
Use the
GoogleCloudStorageToBigQueryOperator
to execute a BigQuery load job.
load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table='airflow_test.gcs_to_bq_table',
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag)