Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
OpenDocCN
airflow-doc-zh
提交
0213dc0a
A
airflow-doc-zh
项目概览
OpenDocCN
/
airflow-doc-zh
9 个月 前同步成功
通知
3
Star
208
Fork
63
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
A
airflow-doc-zh
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
0213dc0a
编写于
3月 21, 2019
作者:
飞
飞龙
提交者:
GitHub
3月 21, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #54 from morefreeze/29
29
上级
ed9196bb
7cb5f547
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
39 addition
and
42 deletion
+39
-42
zh/29.md
zh/29.md
+39
-42
未找到文件。
zh/29.md
浏览文件 @
0213dc0a
#
Lineage
#
数据血缘(Lineage)
注意
> 贡献者:[@morefreeze](https://github.com/morefreeze)
Lineage 支持是非常实验性的,可能会发生变化。
> 注意:
> Lineage 支持是实验性的,可能随时会发生变化。
Airflow
可以帮助跟踪数据的来源,发生的事情以及数据随时间的
变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。
Airflow
可以帮助跟踪数据的来源,以及数据发生了什么
变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。
气流通过任务的入口和出口跟踪数据。 让我们从一个例子开始,
看看它是如何工作的。
Airflow 通过任务的 inlets 和 outlets 跟踪数据。 让我们通过一个例子
看看它是如何工作的。
```
py
from
airflow.operators.bash_operator
import
BashOperator
from
airflow.operators.bash_operator
import
BashOperator
from
airflow.operators.dummy_operator
import
DummyOperator
from
airflow.lineage.datasets
import
File
from
airflow.models
import
DAG
from
datetime
import
timedelta
FILE_CATEGORIES
=
[
"CAT1"
,
"CAT2"
,
"CAT3"
]
FILE_CATEGORIES
=
[
"CAT1"
,
"CAT2"
,
"CAT3"
]
args
=
{
'owner'
:
'airflow'
,
'start_date'
:
airflow
.
utils
.
dates
.
days_ago
(
2
)
'owner'
:
'airflow'
,
'start_date'
:
airflow
.
utils
.
dates
.
days_ago
(
2
)
}
dag
=
DAG
(
dag_id
=
'example_lineage'
,
default_args
=
args
,
schedule_interval
=
'0 0 * * *'
,
dagrun_timeout
=
timedelta
(
minutes
=
60
))
dag
=
DAG
(
dag_id
=
'example_lineage'
,
default_args
=
args
,
schedule_interval
=
'0 0 * * *'
,
dagrun_timeout
=
timedelta
(
minutes
=
60
))
f_final
=
File
(
"/tmp/final"
)
run_this_last
=
DummyOperator
(
task_id
=
'run_this_last'
,
dag
=
dag
,
inlets
=
{
"auto"
:
True
},
outlets
=
{
"datasets"
:
[
f_final
,]})
f_final
=
File
(
"/tmp/final"
)
run_this_last
=
DummyOperator
(
task_id
=
'run_this_last'
,
dag
=
dag
,
inlets
=
{
"auto"
:
True
},
outlets
=
{
"datasets"
:
[
f_final
,]})
f_in
=
File
(
"/tmp/whole_directory/"
)
f_in
=
File
(
"/tmp/whole_directory/"
)
outlets
=
[]
for
file
in
FILE_CATEGORIES
:
f_out
=
File
(
"/tmp/ {} /{{{{ execution_date }}}}"
.
format
(
file
))
outlets
.
append
(
f_out
)
run_this
=
BashOperator
(
task_id
=
'run_me_first'
,
bash_command
=
'echo 1'
,
dag
=
dag
,
inlets
=
{
"datasets"
:
[
f_in
,]},
outlets
=
{
"datasets"
:
outlets
}
for
file
in
FILE_CATEGORIES
:
f_out
=
File
(
"/tmp/{}/{{{{ execution_date }}}}"
.
format
(
file
))
outlets
.
append
(
f_out
)
run_this
=
BashOperator
(
task_id
=
'run_me_first'
,
bash_command
=
'echo 1'
,
dag
=
dag
,
inlets
=
{
"datasets"
:
[
f_in
,]},
outlets
=
{
"datasets"
:
outlets
}
)
run_this
.
set_downstream
(
run_this_last
)
run_this
.
set_downstream
(
run_this_last
)
```
任务采用参数
<
cite
>
入口
<
/cite
>
和
<
cite
>
出口
<
/cite
>
。 入口可以由数据集列表
<
cite
>
{“数据集”:[dataset1,dataset2]}
<
/cite
>
手动定义,也可以配置为从上游任务中查找出口
<
cite
>
{“task_ids”:[“task_id1”,“task_id2”]}
<
/cite
>
或者可以配置为从直接上游任务
<
cite
>
{“auto”:True}
<
/cite
>
或它们的组合中获取出口。 出口被定义为数据集列表
<
cite
>
{“数据集”:[dataset1,dataset2]}
<
/cite
>
。 在执行任务时,数据集的任何字段都使用上下文进行模板化。
注意
如果操作员支持,操作员可以自动添加入口和出口。
任务定义了参数
`inlets`
和
`outlets`
。
`inlets`
可以是一个数据集列表
`{"datesets":[dataset1,dataset2]}`
,也可以是指定的上游任务
`outlets`
像这样
`{"task_ids":["task_id1","task_id2"]}`
,或者不想指定直接用
`{"auto":True}`
也可以,甚至是前面几种的组合。
`outlets`
也是一个数据集列表
`{"datesets":[dataset1,dataset2]}`
。 在运行任务时,数据集的字段会被模板渲染。
在示例DAG任务中,
<
cite
>
run_me_first
<
/cite
>
是一个BashOperator,它接收从列表生成的3个入口:
<
cite
>
CAT1
<
/cite
>
,
<
cite
>
CAT2
<
/cite
>
,
<
cite
>
CAT3
<
/cite
>
。 请注意,
<
cite
>
execution_date
<
/cite
>
是一个模板化字段,将在任务运行时呈现。
> 注意:
> 只要 Operator 支持,它会自动地加上 inlets 和 outlets。
注意
在示例DAG任务中,
`run_me_first`
是一个 BashOperator,它接收
`CAT1`
,
`CAT2`
,
`CAT3`
作为 inlets(译注:根据代码,应为“输出 outlets”)。 其中的
`execution_date`
会在任务运行时被渲染成执行时间。
在幕后,Airflow将沿袭元数据作为任务的
<
cite
>
pre_execute
<
/cite
>
方法的一部分进行准备。 当任务完成执行
<
cite
>
时,
<
/cite
>
将调用
<
cite
>
post_execute
<
/cite
>
并将lineage元数据推送到XCOM中。 因此,如果您要创建自己的覆盖此方法的运算符,请确保分别使用
<
cite
>
prepare_lineage
<
/cite
>
和
<
cite
>
apply_lineage
<
/cite
>
修饰您的方法。
> 注意:
> 在底层,Airflow 会在`pre_execute`方法中准备 lineage 元数据。 当任务运行结束时,会调用`post_execute`将 lineage 元数据推送到 XCOM 中。 因此,如果您要创建自己的 Operator,并且需要覆写这些方法,确保分别用`prepare_lineage`和`apply_lineage`装饰这些方法。
## Apache Atlas
Airflow
可以将其沿袭元数据发送到Apache Atlas。 您需要启用
<
cite
>
atlas
<
/cite
>
后端并正确配置它,例如在
<
cite
>
airflow.cfg中
<
/cite
>
:
Airflow
可以将 lineage 元数据发送到 Apache Atlas。 您需要在
`airflow.cfg`
中配置
`atlas`
:
```
py
[
lineage
]
backend
=
airflow
.
lineage
.
backend
.
atlas
```
config
[
lineage
]
backend
=
airflow
.
lineage
.
backend
.
atlas
[
atlas
]
[
atlas
]
username
=
my_username
password
=
my_password
host
=
host
port
=
21000
```
请确保安装了
<
cite
>
atlasclient
<
/cite
>
软件包。
\ No newline at end of file
请确保已经安装了
`atlasclient`
。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录