diff --git a/zh/5.md b/zh/5.md index 5bda9652e621675529102bd742820f799a792212..cc97559aa7edf9cae9e4fe1032fad3cae0f7759f 100644 --- a/zh/5.md +++ b/zh/5.md @@ -7,97 +7,98 @@ 以下是基本管道定义的示例。 如果这看起来很复杂,请不要担心,下面将逐行说明。 ``` - """ -Code that goes along with the Airflow tutorial located at: +""" +Airflow教程与一起提供的代码位于: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator -from datetime import datetime , timedelta +from datetime import datetime, timedelta default_args = { - 'owner' : 'airflow' , - 'depends_on_past' : False , - 'start_date' : datetime ( 2015 , 6 , 1 ), - 'email' : [ 'airflow@example.com' ], - 'email_on_failure' : False , - 'email_on_retry' : False , - 'retries' : 1 , - 'retry_delay' : timedelta ( minutes = 5 ), + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2015, 6, 1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } -dag = DAG ( 'tutorial' , default_args = default_args ) +dag = DAG('tutorial', default_args=default_args) -# t1, t2 and t3 are examples of tasks created by instantiating operators -t1 = BashOperator ( - task_id = 'print_date' , - bash_command = 'date' , - dag = dag ) +# t1、t2和t3是通过实例化操作器创建的任务示例 +t1 = BashOperator( + task_id='print_date', + bash_command='date', + dag=dag) -t2 = BashOperator ( - task_id = 'sleep' , - bash_command = 'sleep 5' , - retries = 3 , - dag = dag ) +t2 = BashOperator( + task_id='sleep', + bash_command='sleep 5', + retries=3, + dag=dag) templated_command = """ - { % f or i in range(5) %} - echo "{{ ds }}" - echo "{{ macros.ds_add(ds, 7)}}" - echo "{{ params.my_param }}" - { % e ndfor %} + { % f or i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7)}}" + echo "{{ params.my_param }}" + { % e ndfor %} """ -t3 = BashOperator ( - task_id = 'templated' , - bash_command = templated_command , - params = { 'my_param' : 'Parameter I passed in' }, - dag = dag ) +t3 = BashOperator( + task_id='templated', + bash_command=templated_command, + params={'my_param': 'Parameter I passed in'}, + dag=dag) + +t2.set_upstream(t1) +t3.set_upstream(t1) -t2 . set_upstream ( t1 ) -t3 . set_upstream ( t1 ) ``` ## 这是一个DAG定义文件 -包围你的一件事(对于每个人来说可能不是很直观)是这个Airflow Python脚本实际上只是一个配置文件,将DAG的结构指定为代码。 此处定义的实际任务将在与此脚本的上下文不同的上下文中运行。 不同的任务在不同的时间点运行在不同的工作者上,这意味着该脚本不能用于在任务之间交叉通信。 请注意,为此,我们有一个名为`XCom`的更高级功能。 +包围你的一件事(对于每个人来说可能不是很直观)是这个Airflow的Python脚本实际上只是一个将DAG的结构指定为代码的配置文件。 此处定义的实际任务将在与此脚本不同的上下文中运行。 不同的任务在不同的时间点运行在不同的工作者上,这意味着该脚本不能用于在任务之间交叉通信。 请注意,为此,我们有一个名为`XCom`的更高级功能。 -人们有时会将DAG定义文件视为可以进行实际数据处理的地方 - 事实并非如此! 该脚本的目的是定义DAG对象。 它需要快速评估(秒,而不是几分钟),因为调度程序将定期执行它以反映更改(如果有的话)。 +人们有时会将DAG定义文件视为可以进行实际数据处理的地方 - 但事实并非如此! 该脚本的目的是定义DAG对象。 它需要快速评估(秒,而不是几分钟),因为调度程序将定期执行它以反映更改(如果有的话)。 ## 导入模块 -Airflow管道只是一个Python脚本,恰好定义了Airflow DAG对象。 让我们首先导入我们需要的库。 +Airflow管道只是一个定义了Airflow DAG对象的Python脚本。 让我们首先导入我们需要的库。 ``` - # The DAG object; we'll need this to instantiate a DAG +# DAG对象; 我们将需要它来实例化一个DAG from airflow import DAG -# Operators; we need this to operate! +# 操作器;我们将需要它来操作 from airflow.operators.bash_operator import BashOperator ``` ## 默认参数 -我们即将创建一个DAG和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数(这将变得多余),或者(更好!)我们可以定义一个默认参数的字典,我们可以可以在创建任务时使用。 +我们即将创建一个DAG和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数(这可能变得多余),或者(更好!)我们可以定义一个默认参数的字典,在创建任务时来使用。 ``` - from datetime import datetime , timedelta +from datetime import datetime, timedelta default_args = { - 'owner' : 'airflow' , - 'depends_on_past' : False , - 'start_date' : datetime ( 2015 , 6 , 1 ), - 'email' : [ 'airflow@example.com' ], - 'email_on_failure' : False , - 'email_on_retry' : False , - 'retries' : 1 , - 'retry_delay' : timedelta ( minutes = 5 ), + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2015, 6, 1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, @@ -108,76 +109,76 @@ default_args = { 有关BaseOperator参数及其功能的更多信息,请参阅[`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")文档。 -另外,请注意,您可以轻松定义可用于不同目的的不同参数集。 一个例子是在生产和开发环境之间进行不同的设置。 +另外,请注意,您可以轻松定义可用于不同目的的不同参数集。 一个典型的例子是在生产和开发环境之间进行不同的设置。 ## 实例化DAG -我们需要一个DAG对象来嵌入我们的任务。 这里我们传递一个定义`dag_id`的字符串,它用作DAG的唯一标识符。 我们还传递我们刚刚定义的默认参数字典,并为DAG定义1天的`schedule_interval` 。 +我们需要一个DAG对象来嵌入我们的任务。 这里我们传递一个定义为`dag_id`的字符串,把它用作DAG的唯一标识符。 我们还传递我们刚刚定义的默认参数字典,并为DAG定义为1天的`schedule_interval` 。 ``` - dag = DAG ( - 'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 )) +dag = DAG( + 'tutorial', default_args=default_args, schedule_interval=timedelta(1)) ``` ## 任务 -在实例化操作员对象时生成任务。 从运算符实例化的对象称为构造函数。 第一个参数`task_id`充当任务的唯一标识符。 +在实例化操作器时生成任务,从操作器实例化的对象称为构造函数,第一个参数`task_id`充当任务的唯一标识符。 ``` - t1 = BashOperator ( - task_id = 'print_date' , - bash_command = 'date' , - dag = dag ) +t1 = BashOperator( + task_id='print_date', + bash_command='date', + dag=dag) -t2 = BashOperator ( - task_id = 'sleep' , - bash_command = 'sleep 5' , - retries = 3 , - dag = dag ) +t2 = BashOperator( + task_id='sleep', + bash_command='sleep 5', + retries=3, + dag=dag) ``` -请注意我们如何将从BaseOperator继承的所有运算符( `retries` )共同的运算符特定参数( `bash_command` )和通用参数传递给运算符的构造函数。 这比为每个构造函数调用传递每个参数更简单。 另请注意,在第二个任务中,我们使用`3`覆盖`retries`参数。 +请注意我们如何把该运算器的特定参数(`bash_command`)和所有操作器通用的参数(`retries`)传递给运算符的构造函数。 这比为每个构造函数调用传递每个参数更简单。 另请注意,在第二个任务中,我们使用`3`覆盖了默认的`retries`参数值。 -任务的优先规则如下: +任务参数的优先规则如下: -1. 明确传递参数 -2. `default_args`字典中存在的值 -3. 运算符的默认值(如果存在) +1. 明确传递参数 +2. `default_args`字典中存在的值 +3. 运算符的默认值(如果存在) -任务必须包含或继承参数`task_id`和`owner` ,否则Airflow将引发异常。 +任务必须包含或继承参数`task_id`和`owner`,否则Airflow将出现异常。 -## 与金贾一起模仿 +## 使用Jinja作为模版 Airflow充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的强大功能,并为管道作者提供了一组内置参数和宏。 Airflow还为管道作者提供了定义自己的参数,宏和模板的钩子。 本教程几乎没有涉及在Airflow中使用模板进行操作的表面,但本节的目的是让您知道此功能的存在,让您熟悉双花括号,并指向最常见的模板变量: `{{ ds }}` (今天的“日期戳”)。 ``` - templated_command = """ - { % f or i in range(5) %} - echo "{{ ds }}" - echo "{{ macros.ds_add(ds, 7) }}" - echo "{{ params.my_param }}" - { % e ndfor %} +templated_command = """ + { % f or i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7) }}" + echo "{{ params.my_param }}" + { % e ndfor %} """ -t3 = BashOperator ( - task_id = 'templated' , - bash_command = templated_command , - params = { 'my_param' : 'Parameter I passed in' }, - dag = dag ) +t3 = BashOperator( + task_id='templated', + bash_command=templated_command, + params={'my_param': 'Parameter I passed in'}, + dag=dag) ``` 请注意, `templated_command`包含`{% %}`块中的代码逻辑,引用参数如`{{ ds }}` ,调用`{{ macros.ds_add(ds, 7)}}`的函数,并在`{{ macros.ds_add(ds, 7)}}`引用用户定义的参数`{{ params.my_param }}` 。 -`BaseOperator`的`params`钩子允许您将参数和/或对象的字典传递给模板。 请花点时间了解参数`my_param`如何通过模板。 +`BaseOperator`的`params`钩子允许您将参数和(或)对象的字典传递给模板。 请花点时间了解如何将参数`my_param`传递给模板。 -文件也可以传递给`bash_command`参数,例如`bash_command='templated_command.sh'` ,其中文件位置相对于包含管道文件的目录(在本例中为`tutorial.py` )。 这可能是出于许多原因,例如分离脚本的逻辑和管道代码,允许在使用不同语言编写的文件中进行正确的代码突出显示,以及构造管道的一般灵活性。 也可以将`template_searchpath`定义为指向DAG构造函数调用中的任何文件夹位置。 +文件也可以被传递给`bash_command`参数,例如`bash_command='templated_command.sh'` ,其中文件位置相对于包含管道文件的目录(在本例中为`tutorial.py` )。 这可能是出于许多原因,例如分离脚本的逻辑和管道代码,允许在使用不同语言编写的文件中进行正确的代码突出显示,以及构造管道的一般灵活性。 也可以将`template_searchpath`定义为指向DAG构造函数调用中的任何文件夹位置。 -使用相同的DAG构造函数调用,可以定义`user_defined_macros` ,它允许您指定自己的变量。 例如,将`dict(foo='bar')`传递给此参数允许您在模板中使用`{{ foo }}` 。 此外,指定`user_defined_filters`允许您注册自己的过滤器。 例如,将`dict(hello=lambda name: 'Hello %s' % name)`传递给此参数允许您使用`{{ 'world' | hello }}` 你的模板中的`{{ 'world' | hello }}` 有关自定义过滤器的更多信息,请查看[Jinja文档](http://jinja.pocoo.org/docs/dev/api/) +可以定义`user_defined_macros` 来使用相同的DAG构造函数调用,它允许您指定自己的变量。 例如,将`dict(foo='bar')`传递给此参数允许您在模板中使用`{{ foo }}` 。 此外,允许您指定`user_defined_filters`来注册自己的过滤器。 例如,将`dict(hello=lambda name: 'Hello %s' % name)`传递给此参数可以允许您在你的模板中使用`{{ 'world' | hello }}`。有关自定义过滤器的更多信息,请查看[Jinja文档](http://jinja.pocoo.org/docs/dev/api/) 有关可以在模板中引用的变量和宏的更多信息,请务必阅读[宏](code.html)部分 @@ -186,82 +187,81 @@ t3 = BashOperator ( 我们有两个不相互依赖的简单任务。 以下是一些可以定义它们之间依赖关系的方法: ``` - t2 . set_upstream ( t1 ) +t2.set_upstream(t1) -# This means that t2 will depend on t1 -# running successfully to run -# It is equivalent to +# 这意味着t2会在t1成功执行之后才会执行 +# 与下面这种写法相等 # t1.set_downstream(t2) -t3 . set_upstream ( t1 ) +t3.set_upstream(t1) -# all of this is equivalent to +# 上面的写法与下面这种写法相等 # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated') ``` -请注意,在执行脚本时,Airflow会在DAG中找到循环或多次引用依赖项时引发异常。 +请注意,在执行脚本时,在DAG中如果找到了循环或多次引用依赖项时,那么Airflow会引发异常。 ## 概括 -好吧,所以我们有一个非常基本的DAG。 此时,您的代码应如下所示: +到此,我们有了一个非常基本的DAG。此时,您的代码应如下所示: ``` """ -Code that goes along with the Airflow located at: +Airflow教程与一起提供的代码位于: http://airflow.readthedocs.org/en/latest/tutorial.html """ from airflow import DAG from airflow.operators.bash_operator import BashOperator -from datetime import datetime , timedelta +from datetime import datetime, timedelta default_args = { - 'owner' : 'airflow' , - 'depends_on_past' : False , - 'start_date' : datetime ( 2015 , 6 , 1 ), - 'email' : [ 'airflow@example.com' ], - 'email_on_failure' : False , - 'email_on_retry' : False , - 'retries' : 1 , - 'retry_delay' : timedelta ( minutes = 5 ), + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2015, 6, 1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } -dag = DAG ( - 'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 )) +dag = DAG( + 'tutorial', default_args=default_args, schedule_interval=timedelta(1)) # t1, t2 and t3 are examples of tasks created by instantiating operators -t1 = BashOperator ( - task_id = 'print_date' , - bash_command = 'date' , - dag = dag ) +t1 = BashOperator( + task_id='print_date', + bash_command='date', + dag=dag) -t2 = BashOperator ( - task_id = 'sleep' , - bash_command = 'sleep 5' , - retries = 3 , - dag = dag ) +t2 = BashOperator( + task_id='sleep', + bash_command='sleep 5', + retries=3, + dag=dag) templated_command = """ - { % f or i in range(5) %} - echo "{{ ds }}" - echo "{{ macros.ds_add(ds, 7)}}" - echo "{{ params.my_param }}" - { % e ndfor %} + { % f or i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7)}}" + echo "{{ params.my_param }}" + { % e ndfor %} """ -t3 = BashOperator ( - task_id = 'templated' , - bash_command = templated_command , - params = { 'my_param' : 'Parameter I passed in' }, - dag = dag ) +t3 = BashOperator( + task_id='templated', + bash_command=templated_command, + params={'my_param': 'Parameter I passed in'}, + dag=dag) -t2 . set_upstream ( t1 ) -t3 . set_upstream ( t1 ) +t2.set_upstream(t1) +t3.set_upstream(t1) ``` @@ -269,7 +269,7 @@ t3 . set_upstream ( t1 ) ### 运行脚本 -是时候进行一些测试了。 首先让我们确保管道解析。 假设我们正在保存`airflow.cfg`引用的`airflow.cfg`文件夹中`tutorial.py`中上一步的代码。 DAG的默认位置是`~/airflow/dags` 。 +是时候进行一些测试了。 首先让我们确保管道解析。 假设我们正在保存`tutorial.py`中上一步的代码到`airflow.cfg`引用的文件夹中。 DAG的默认位置是`~/airflow/dags` 。 ``` python ~/airflow/dags/tutorial.py @@ -283,13 +283,13 @@ python ~/airflow/dags/tutorial.py 让我们运行一些命令来进一步验证这个脚本。 ``` - # print the list of active DAGs +# 打印出所有正在活跃状态的DAG airflow list_dags -# prints the list of tasks the "tutorial" dag_id +# 打印出tutorial DAG中所有的任务 airflow list_tasks tutorial -# prints the hierarchy of tasks in the tutorial DAG +# 打印出tutorial DAG的任务层次结构 airflow list_tasks tutorial --tree ``` @@ -299,57 +299,56 @@ airflow list_tasks tutorial --tree 让我们通过在特定日期运行实际任务实例来进行测试。 在此上下文中指定的日期是`execution_date` ,它模拟在特定日期+时间运行任务或dag的调度程序: ``` - # command layout: command subcommand dag_id task_id date +# 命令样式: command subcommand dag_id task_id date -# testing print_date -airflow test tutorial print_date 2015 -06-01 +# 测试print_date +airflow test tutorial print_date 2015-06-01 -# testing sleep -airflow test tutorial sleep 2015 -06-01 +# 测试sleep +airflow test tutorial sleep 2015-06-01 ``` -现在还记得我们之前用模板做过的事吗? 通过运行此命令,了解如何呈现和执行此模板: +现在还记得我们之前做过的模板吗? 通过运行此命令,了解如何呈现和执行此模板: ``` - # testing templated -airflow test tutorial templated 2015 -06-01 + # 测试模版渲染 +airflow test tutorial templated 2015-06-01 ``` -这应该导致显示详细的事件日志并最终运行bash命令并打印结果。 +这应该显示详细的事件日志并最终运行bash命令并打印结果。 -请注意, `airflow test`命令在本地运行任务实例,将其日志输出到stdout(在屏幕上),不依赖于依赖项,并且不向数据库传达状态(运行,成功,失败,...)。 它只允许测试单个任务实例。 +请注意, `airflow test`命令在本地运行任务实例时,会将其日志输出到stdout(在屏幕上),不依赖于依赖项,并且不向数据库传达状态(运行,成功,失败,...)。 它只允许测试单个任务实例。 ### 回填 -一切看起来都运行良好所以让我们运行回填。 `backfill`将尊重您的依赖关系,将日志发送到文件并与数据库通信以记录状态。 如果您有网络服务器,您将能够跟踪进度。 如果您有兴趣在回填过程中直观地跟踪进度, `airflow webserver`将启动Web服务器。 +一切看起来都运行良好,所以此时让我们运行回填。 `backfill`将尊重您的依赖关系,将日志发送到文件并与数据库通信以记录状态。 如果您有网络服务器,您将能够跟踪进度。 `airflow webserver`将启动Web服务器,如果您有兴趣在回填过程中直观地跟踪进度。 -请注意,如果使用`depends_on_past=True` ,则单个任务实例将取决于前面任务实例的成功,除了指定了自身的start_date,此依赖关系将被忽略。 +请注意,如果使用`depends_on_past=True` ,则单个任务实例的执行将取决于前面任务实例是否成功,除非指定了当前日期为start_date,那么此依赖关系将被忽略。 此上下文中的日期范围是`start_date`和可选的`end_date` ,它们用于使用此dag中的任务实例填充运行计划。 ``` - # optional, start a web server in debug mode in the background +# 可选,在后台以debug模式运行web服务器 # airflow webserver --debug & -# start your backfill on a date range -airflow backfill tutorial -s 2015 -06-01 -e 2015 -06-07 +# 在时间范围内回填执行任务 +airflow backfill tutorial -s 2015-06-01 -e 2015-06-07 ``` ## 下一步是什么? -就是这样,你已经编写,测试并回填了你的第一个Airflow管道。 将代码合并到具有针对它运行的主调度程序的代码存储库中应该让它每天都被触发并运行。 +就如上面这样,你已经编写,测试并回填了你的第一个Airflow管道。 将代码合并到在后台运行了主调度程序的代码存储库中,会让它每天都被触发并运行。 以下是您可能想要做的一些事情: -* 深入了解用户界面 - 点击所有内容! - -* 继续阅读文档! 特别是以下部分: +* 深入了解用户界面 - 点击所有内容! +* 继续阅读文档! 特别是以下部分: - > * 命令行界面 - > * 运营商 - > * 宏 + * 命令行界面 + * 运营商 + * 宏 -* 写下你的第一个管道! \ No newline at end of file +* 写下你的第一个管道!