9.html 12.9 KB
Newer Older
W
init  
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
  "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">
<head><title></title>
<link href="../style/ebook.css" type="text/css" rel="stylesheet">
</head>
<body>
<h1>Using Operators</h1>
<p>An operator represents a single, ideally idempotent, task. Operators
determine what actually executes when your DAG runs.</p>
<p>See the <a class="reference internal" href="../concepts.html#concepts-operators"><span class="std std-ref">Operators Concepts</span></a> documentation and the
<a class="reference internal" href="../code.html#api-reference-operators"><span class="std std-ref">Operators API Reference</span></a> for more
information.</p>
<div class="contents local topic" id="contents">
<ul class="simple">
<li><a class="reference internal" href="#bashoperator" id="id2">BashOperator</a><ul>
<li><a class="reference internal" href="#templating" id="id3">Templating</a></li>
<li><a class="reference internal" href="#troubleshooting" id="id4">Troubleshooting</a><ul>
<li><a class="reference internal" href="#jinja-template-not-found" id="id5">Jinja template not found</a></li>
</ul>
</li>
</ul>
</li>
<li><a class="reference internal" href="#pythonoperator" id="id6">PythonOperator</a><ul>
<li><a class="reference internal" href="#passing-in-arguments" id="id7">Passing in arguments</a></li>
<li><a class="reference internal" href="#id1" id="id8">Templating</a></li>
</ul>
</li>
<li><a class="reference internal" href="#google-cloud-platform-operators" id="id9">Google Cloud Platform Operators</a><ul>
<li><a class="reference internal" href="#googlecloudstoragetobigqueryoperator" id="id10">GoogleCloudStorageToBigQueryOperator</a></li>
</ul>
</li>
</ul>
</div>
<div class="section" id="bashoperator">
<h2 class="sigil_not_in_toc"><a class="toc-backref" href="#id2">BashOperator</a></h2>
<p>Use the <a class="reference internal" href="../code.html#airflow.operators.bash_operator.BashOperator" title="airflow.operators.bash_operator.BashOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">BashOperator</span></code></a> to execute
commands in a <a class="reference external" href="https://www.gnu.org/software/bash/">Bash</a> shell.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">run_this</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;run_after_loop&apos;</span><span class="p">,</span> <span class="n">bash_command</span><span class="o">=</span><span class="s1">&apos;echo 1&apos;</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</pre>
</div>
</div>
<div class="section" id="templating">
<h3 class="sigil_not_in_toc"><a class="toc-backref" href="#id3">Templating</a></h3>
<p>You can use <a class="reference internal" href="../concepts.html#jinja-templating"><span class="std std-ref">Jinja templates</span></a> to parameterize the
<code class="docutils literal notranslate"><span class="pre">bash_command</span></code> argument.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">task</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;also_run_this&apos;</span><span class="p">,</span>
    <span class="n">bash_command</span><span class="o">=</span><span class="s1">&apos;echo &quot;run_id={{ run_id }} | dag_run={{ dag_run }}&quot;&apos;</span><span class="p">,</span>
    <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</pre>
</div>
</div>
</div>
<div class="section" id="troubleshooting">
<h3 class="sigil_not_in_toc"><a class="toc-backref" href="#id4">Troubleshooting</a></h3>
<div class="section" id="jinja-template-not-found">
<h4 class="sigil_not_in_toc"><a class="toc-backref" href="#id5">Jinja template not found</a></h4>
<p>Add a space after the script name when directly calling a Bash script with
the <code class="docutils literal notranslate"><span class="pre">bash_command</span></code> argument. This is because Airflow tries to apply a Jinja
template to it, which will fail.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">t2</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;bash_example&apos;</span><span class="p">,</span>

    <span class="c1"># This fails with `Jinja template not found` error</span>
    <span class="c1"># bash_command=&quot;/home/batcher/test.sh&quot;,</span>

    <span class="c1"># This works (has a space after)</span>
    <span class="n">bash_command</span><span class="o">=</span><span class="s2">&quot;/home/batcher/test.sh &quot;</span><span class="p">,</span>
    <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</pre>
</div>
</div>
</div>
</div>
</div>
<div class="section" id="pythonoperator">
<h2 class="sigil_not_in_toc"><a class="toc-backref" href="#id6">PythonOperator</a></h2>
<p>Use the <a class="reference internal" href="../code.html#airflow.operators.python_operator.PythonOperator" title="airflow.operators.python_operator.PythonOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">PythonOperator</span></code></a> to execute
Python callables.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">def</span> <span class="nf">print_context</span><span class="p">(</span><span class="n">ds</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
    <span class="n">pprint</span><span class="p">(</span><span class="n">kwargs</span><span class="p">)</span>
    <span class="k">print</span><span class="p">(</span><span class="n">ds</span><span class="p">)</span>
    <span class="k">return</span> <span class="s1">&apos;Whatever you return gets printed in the logs&apos;</span>


<span class="n">run_this</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;print_the_context&apos;</span><span class="p">,</span>
    <span class="n">provide_context</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span>
    <span class="n">python_callable</span><span class="o">=</span><span class="n">print_context</span><span class="p">,</span>
    <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</pre>
</div>
</div>
<div class="section" id="passing-in-arguments">
<h3 class="sigil_not_in_toc"><a class="toc-backref" href="#id7">Passing in arguments</a></h3>
<p>Use the <code class="docutils literal notranslate"><span class="pre">op_args</span></code> and <code class="docutils literal notranslate"><span class="pre">op_kwargs</span></code> arguments to pass additional arguments
to the Python callable.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">def</span> <span class="nf">my_sleeping_function</span><span class="p">(</span><span class="n">random_base</span><span class="p">):</span>
    <span class="sd">&quot;&quot;&quot;This is a function that will run within the DAG execution&quot;&quot;&quot;</span>
    <span class="n">time</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="n">random_base</span><span class="p">)</span>


<span class="c1"># Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">5</span><span class="p">):</span>
    <span class="n">task</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
        <span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;sleep_for_&apos;</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">i</span><span class="p">),</span>
        <span class="n">python_callable</span><span class="o">=</span><span class="n">my_sleeping_function</span><span class="p">,</span>
        <span class="n">op_kwargs</span><span class="o">=</span><span class="p">{</span><span class="s1">&apos;random_base&apos;</span><span class="p">:</span> <span class="nb">float</span><span class="p">(</span><span class="n">i</span><span class="p">)</span> <span class="o">/</span> <span class="mi">10</span><span class="p">},</span>
        <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>

    <span class="n">task</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">run_this</span><span class="p">)</span>
</pre>
</div>
</div>
</div>
<div class="section" id="id1">
<h3 class="sigil_not_in_toc"><a class="toc-backref" href="#id8">Templating</a></h3>
<p>When you set the <code class="docutils literal notranslate"><span class="pre">provide_context</span></code> argument to <code class="docutils literal notranslate"><span class="pre">True</span></code>, Airflow passes in
an additional set of keyword arguments: one for each of the <a class="reference internal" href="../code.html#macros"><span class="std std-ref">Jinja
template variables</span></a> and a <code class="docutils literal notranslate"><span class="pre">templates_dict</span></code> argument.</p>
<p>The <code class="docutils literal notranslate"><span class="pre">templates_dict</span></code> argument is templated, so each value in the dictionary
is evaluated as a <a class="reference internal" href="../concepts.html#jinja-templating"><span class="std std-ref">Jinja template</span></a>.</p>
</div>
</div>
<div class="section" id="google-cloud-platform-operators">
<h2 class="sigil_not_in_toc"><a class="toc-backref" href="#id9">Google Cloud Platform Operators</a></h2>
<div class="section" id="googlecloudstoragetobigqueryoperator">
<h3 class="sigil_not_in_toc"><a class="toc-backref" href="#id10">GoogleCloudStorageToBigQueryOperator</a></h3>
<p>Use the
<a class="reference internal" href="../integration.html#airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator" title="airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">GoogleCloudStorageToBigQueryOperator</span></code></a>
to execute a BigQuery load job.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">load_csv</span> <span class="o">=</span> <span class="n">gcs_to_bq</span><span class="o">.</span><span class="n">GoogleCloudStorageToBigQueryOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="s1">&apos;gcs_to_bq_example&apos;</span><span class="p">,</span>
    <span class="n">bucket</span><span class="o">=</span><span class="s1">&apos;cloud-samples-data&apos;</span><span class="p">,</span>
    <span class="n">source_objects</span><span class="o">=</span><span class="p">[</span><span class="s1">&apos;bigquery/us-states/us-states.csv&apos;</span><span class="p">],</span>
    <span class="n">destination_project_dataset_table</span><span class="o">=</span><span class="s1">&apos;airflow_test.gcs_to_bq_table&apos;</span><span class="p">,</span>
    <span class="n">schema_fields</span><span class="o">=</span><span class="p">[</span>
        <span class="p">{</span><span class="s1">&apos;name&apos;</span><span class="p">:</span> <span class="s1">&apos;name&apos;</span><span class="p">,</span> <span class="s1">&apos;type&apos;</span><span class="p">:</span> <span class="s1">&apos;STRING&apos;</span><span class="p">,</span> <span class="s1">&apos;mode&apos;</span><span class="p">:</span> <span class="s1">&apos;NULLABLE&apos;</span><span class="p">},</span>
        <span class="p">{</span><span class="s1">&apos;name&apos;</span><span class="p">:</span> <span class="s1">&apos;post_abbr&apos;</span><span class="p">,</span> <span class="s1">&apos;type&apos;</span><span class="p">:</span> <span class="s1">&apos;STRING&apos;</span><span class="p">,</span> <span class="s1">&apos;mode&apos;</span><span class="p">:</span> <span class="s1">&apos;NULLABLE&apos;</span><span class="p">},</span>
    <span class="p">],</span>
    <span class="n">write_disposition</span><span class="o">=</span><span class="s1">&apos;WRITE_TRUNCATE&apos;</span><span class="p">,</span>
    <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</pre>
</div>
</div>
</div>
</div>
</body>
</html>