23.html 15.6 KB
Newer Older
W
init  
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
  "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">
<head><title></title>
<link href="../style/ebook.css" type="text/css" rel="stylesheet">
</head>
<body>
<h1>Scheduling &amp; Triggers</h1>
<p>The Airflow scheduler monitors all tasks and all DAGs, and triggers the
task instances whose dependencies have been met. Behind the scenes,
it monitors and stays in sync with a folder for all DAG objects it may contain,
and periodically (every minute or so) inspects active tasks to see whether
they can be triggered.</p>
<p>The Airflow scheduler is designed to run as a persistent service in an
Airflow production environment. To kick it off, all you need to do is
execute <code class="docutils literal notranslate"><span class="pre">airflow</span> <span class="pre">scheduler</span></code>. It will use the configuration specified in
<code class="docutils literal notranslate"><span class="pre">airflow.cfg</span></code>.</p>
<p>Note that if you run a DAG on a <code class="docutils literal notranslate"><span class="pre">schedule_interval</span></code> of one day,
the run stamped <code class="docutils literal notranslate"><span class="pre">2016-01-01</span></code> will be triggered soon after <code class="docutils literal notranslate"><span class="pre">2016-01-01T23:59</span></code>.
In other words, the job instance is started once the period it covers
has ended.</p>
<p><strong>Let&#x2019;s Repeat That</strong> The scheduler runs your job one <code class="docutils literal notranslate"><span class="pre">schedule_interval</span></code> AFTER the
start date, at the END of the period.</p>
<p>The scheduler starts an instance of the executor specified in the your
<code class="docutils literal notranslate"><span class="pre">airflow.cfg</span></code>. If it happens to be the <code class="docutils literal notranslate"><span class="pre">LocalExecutor</span></code>, tasks will be
executed as subprocesses; in the case of <code class="docutils literal notranslate"><span class="pre">CeleryExecutor</span></code> and
<code class="docutils literal notranslate"><span class="pre">MesosExecutor</span></code>, tasks are executed remotely.</p>
<p>To start a scheduler, simply run the command:</p>
<div class="code bash highlight-default notranslate"><div class="highlight"><pre><span></span><span class="n">airflow</span> <span class="n">scheduler</span>
</pre>
</div>
</div>
<div class="section" id="dag-runs">
<h2 class="sigil_not_in_toc">DAG Runs</h2>
<p>A DAG Run is an object representing an instantiation of the DAG in time.</p>
<p>Each DAG may or may not have a schedule, which informs how <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Runs</span></code> are
created. <code class="docutils literal notranslate"><span class="pre">schedule_interval</span></code> is defined as a DAG arguments, and receives
preferably a
<a class="reference external" href="https://en.wikipedia.org/wiki/Cron#CRON_expression">cron expression</a> as
a <code class="docutils literal notranslate"><span class="pre">str</span></code>, or a <code class="docutils literal notranslate"><span class="pre">datetime.timedelta</span></code> object. Alternatively, you can also
use one of these cron &#x201C;preset&#x201D;:</p>
<table border="1" class="docutils">
<colgroup>
<col width="15%">
<col width="69%">
<col width="16%">
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">preset</th>
<th class="head">meaning</th>
<th class="head">cron</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td><code class="docutils literal notranslate"><span class="pre">None</span></code></td>
<td>Don&#x2019;t schedule, use for exclusively &#x201C;externally triggered&#x201D;
DAGs</td>
<td>&#xA0;</td>
</tr>
<tr class="row-odd"><td><code class="docutils literal notranslate"><span class="pre">@once</span></code></td>
<td>Schedule once and only once</td>
<td>&#xA0;</td>
</tr>
<tr class="row-even"><td><code class="docutils literal notranslate"><span class="pre">@hourly</span></code></td>
<td>Run once an hour at the beginning of the hour</td>
<td><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">*</span></code></td>
</tr>
<tr class="row-odd"><td><code class="docutils literal notranslate"><span class="pre">@daily</span></code></td>
<td>Run once a day at midnight</td>
<td><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">*</span></code></td>
</tr>
<tr class="row-even"><td><code class="docutils literal notranslate"><span class="pre">@weekly</span></code></td>
<td>Run once a week at midnight on Sunday morning</td>
<td><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">*</span> <span class="pre">*</span> <span class="pre">0</span></code></td>
</tr>
<tr class="row-odd"><td><code class="docutils literal notranslate"><span class="pre">@monthly</span></code></td>
<td>Run once a month at midnight of the first day of the month</td>
<td><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">1</span> <span class="pre">*</span> <span class="pre">*</span></code></td>
</tr>
<tr class="row-even"><td><code class="docutils literal notranslate"><span class="pre">@yearly</span></code></td>
<td>Run once a year at midnight of January 1</td>
<td><code class="docutils literal notranslate"><span class="pre">0</span> <span class="pre">0</span> <span class="pre">1</span> <span class="pre">1</span> <span class="pre">*</span></code></td>
</tr>
</tbody>
</table>
<p>Your DAG will be instantiated
for each schedule, while creating a <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Run</span></code> entry for each schedule.</p>
<p>DAG runs have a state associated to them (running, failed, success) and
informs the scheduler on which set of schedules should be evaluated for
task submissions. Without the metadata at the DAG run level, the Airflow
scheduler would have much more work to do in order to figure out what tasks
should be triggered and come to a crawl. It might also create undesired
processing when changing the shape of your DAG, by say adding in new
tasks.</p>
</div>
<div class="section" id="backfill-and-catchup">
<h2 class="sigil_not_in_toc">Backfill and Catchup</h2>
<p>An Airflow DAG with a <code class="docutils literal notranslate"><span class="pre">start_date</span></code>, possibly an <code class="docutils literal notranslate"><span class="pre">end_date</span></code>, and a <code class="docutils literal notranslate"><span class="pre">schedule_interval</span></code> defines a
series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of
Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine
the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any
interval that has not been run (or has been cleared). This concept is called Catchup.</p>
<p>If your DAG is written to handle its own catchup (IE not limited to the interval, but instead to &#x201C;Now&#x201D;
for instance.), then you will want to turn catchup off (Either on the DAG itself with <code class="docutils literal notranslate"><span class="pre">dag.catchup</span> <span class="pre">=</span>
<span class="pre">False</span></code>) or by default at the configuration file level with <code class="docutils literal notranslate"><span class="pre">catchup_by_default</span> <span class="pre">=</span> <span class="pre">False</span></code>. What this
will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG
interval series.</p>
<div class="code python highlight-default notranslate"><div class="highlight"><pre><span></span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd">Code that goes along with the Airflow tutorial located at:</span>
<span class="sd">https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">DAG</span>
<span class="kn">from</span> <span class="nn">airflow.operators.bash_operator</span> <span class="k">import</span> <span class="n">BashOperator</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">timedelta</span>


<span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span>
    <span class="s1">&apos;owner&apos;</span><span class="p">:</span> <span class="s1">&apos;airflow&apos;</span><span class="p">,</span>
    <span class="s1">&apos;depends_on_past&apos;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
    <span class="s1">&apos;start_date&apos;</span><span class="p">:</span> <span class="n">datetime</span><span class="p">(</span><span class="mi">2015</span><span class="p">,</span> <span class="mi">12</span><span class="p">,</span> <span class="mi">1</span><span class="p">),</span>
    <span class="s1">&apos;email&apos;</span><span class="p">:</span> <span class="p">[</span><span class="s1">&apos;airflow@example.com&apos;</span><span class="p">],</span>
    <span class="s1">&apos;email_on_failure&apos;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
    <span class="s1">&apos;email_on_retry&apos;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
    <span class="s1">&apos;retries&apos;</span><span class="p">:</span> <span class="mi">1</span><span class="p">,</span>
    <span class="s1">&apos;retry_delay&apos;</span><span class="p">:</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">minutes</span><span class="o">=</span><span class="mi">5</span><span class="p">),</span>
    <span class="s1">&apos;schedule_interval&apos;</span><span class="p">:</span> <span class="s1">&apos;@hourly&apos;</span><span class="p">,</span>
<span class="p">}</span>

<span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">&apos;tutorial&apos;</span><span class="p">,</span> <span class="n">catchup</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span>
</pre>
</div>
</div>
<p>In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the
command line), a single DAG Run will be created, with an <code class="docutils literal notranslate"><span class="pre">execution_date</span></code> of 2016-01-01, and the next
one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02.</p>
<p>If the <code class="docutils literal notranslate"><span class="pre">dag.catchup</span></code> value had been True instead, the scheduler would have created a DAG Run for each
completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as that interval
hasn&#x2019;t completed) and the scheduler will execute them sequentially. This behavior is great for atomic
datasets that can easily be split into periods. Turning catchup off is great if your DAG Runs perform
backfill internally.</p>
</div>
<div class="section" id="external-triggers">
<h2 class="sigil_not_in_toc">External Triggers</h2>
<p>Note that <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Runs</span></code> can also be created manually through the CLI while
running an <code class="docutils literal notranslate"><span class="pre">airflow</span> <span class="pre">trigger_dag</span></code> command, where you can define a
specific <code class="docutils literal notranslate"><span class="pre">run_id</span></code>. The <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Runs</span></code> created externally to the
scheduler get associated to the trigger&#x2019;s timestamp, and will be displayed
in the UI alongside scheduled <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">runs</span></code>.</p>
<p>In addition, you can also manually trigger a <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Run</span></code> using the web UI (tab &#x201C;DAGs&#x201D; -&gt; column &#x201C;Links&#x201D; -&gt; button &#x201C;Trigger Dag&#x201D;).</p>
</div>
<div class="section" id="to-keep-in-mind">
<h2 class="sigil_not_in_toc">To Keep in Mind</h2>
<ul class="simple">
<li>The first <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Run</span></code> is created based on the minimum <code class="docutils literal notranslate"><span class="pre">start_date</span></code> for the
tasks in your DAG.</li>
<li>Subsequent <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Runs</span></code> are created by the scheduler process, based on
your DAG&#x2019;s <code class="docutils literal notranslate"><span class="pre">schedule_interval</span></code>, sequentially.</li>
<li>When clearing a set of tasks&#x2019; state in hope of getting them to re-run,
it is important to keep in mind the <code class="docutils literal notranslate"><span class="pre">DAG</span> <span class="pre">Run</span></code>&#x2019;s state too as it defines
whether the scheduler should look into triggering tasks for that run.</li>
</ul>
<p>Here are some of the ways you can <strong>unblock tasks</strong>:</p>
<ul class="simple">
<li>From the UI, you can <strong>clear</strong> (as in delete the status of) individual task instances
from the task instances dialog, while defining whether you want to includes the past/future
and the upstream/downstream dependencies. Note that a confirmation window comes next and
allows you to see the set you are about to clear. You can also clear all task instances
associated with the dag.</li>
<li>The CLI command <code class="docutils literal notranslate"><span class="pre">airflow</span> <span class="pre">clear</span> <span class="pre">-h</span></code> has lots of options when it comes to clearing task instance
states, including specifying date ranges, targeting task_ids by specifying a regular expression,
flags for including upstream and downstream relatives, and targeting task instances in specific
states (<code class="docutils literal notranslate"><span class="pre">failed</span></code>, or <code class="docutils literal notranslate"><span class="pre">success</span></code>)</li>
<li>Clearing a task instance will no longer delete the task instance record. Instead it updates
max_tries and set the current task instance state to be None.</li>
<li>Marking task instances as failed can be done through the UI. This can be used to stop running task instances.</li>
<li>Marking task instances as successful can be done through the UI. This is mostly to fix false negatives,
or for instance when the fix has been applied outside of Airflow.</li>
<li>The <code class="docutils literal notranslate"><span class="pre">airflow</span> <span class="pre">backfill</span></code> CLI subcommand has a flag to <code class="docutils literal notranslate"><span class="pre">--mark_success</span></code> and allows selecting
subsections of the DAG as well as specifying date ranges.</li>
</ul>
</div>
</body>
</html>