提交 75c26c0c 编写于 作者: W wizardforcel

zh raw

上级 57acc7be
# 项目
## 历史
Airflow于2014年10月由Airbnb的Maxime Beauchemin开始。 它是第一次提交的开源,并在2015年6月宣布正式加入Airbnb Github。
该项目于2016年3月加入了Apache Software Foundation的孵化计划。
## 提交者
* @mistercrunch(Maxime“Max”Beauchemin)
* @ r39132(Siddharth“Sid”Anand)
* @criccomini(Chris Riccomini)
* @bolkedebruin(Bolke de Bruin)
* @artwr(亚瑟威德默)
* @jlowin(Jeremiah Lowin)
* @patrickleotardif(Patrick Leo Tardif)
* @aoen(丹·达维多夫)
* @syvineckruyk(Steven Yvinec-Kruyk)
* @msumit(Sumit Maheshwari)
* @alexvanboxel(Alex Van Boxel)
* @saguziel(Alex Guziel)
* @joygao(Joy Gao)
* @fokko(Fokko Driesprong)
* @ash(Ash Berlin-Taylor)
* @kaxilnaik(Kaxil Naik)
* @冯涛(陶峰)
有关贡献者的完整列表,请查看[Airflow的Github贡献者页面:](https://github.com/apache/incubator-airflow/graphs/contributors)
## 资源和链接
* [Airflow的官方文档](http://airflow.apache.org/)
* 邮件列表(发送电子邮件至`dev-subscribe@airflow.incubator.apache.org`和/或`commits-subscribe@airflow.incubator.apache.org`订阅每个邮件)
* [关于Apache的Jira的问题](https://issues.apache.org/jira/browse/AIRFLOW)
* [Gitter(聊天)频道](https://gitter.im/airbnb/airflow)
* [更多资源以及Wiki上与Airflow相关内容的链接](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow%2BLinks)
## 路线图
请参阅[维基](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow%2BHome)[](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow%2BHome)路线图
\ No newline at end of file
# 管理连接
Airflow需要知道如何连接到您的环境。 其他系统和服务的主机名,端口,登录名和密码等信息在UI的`Admin->Connection`部分中处理。 您将创作的管道代码将引用Connection对象的“conn_id”。
![https://airflow.apache.org/_images/connections.png](../img/b1caba93dd8fce8b3c81bfb0d58cbf95.jpg)
可以使用UI或环境变量创建和管理连接。
有关更多信息,请参阅[Connenctions Concepts](https://apachecn.github.io/airflow-doc-zh/concepts.html)文档。
## 使用UI创建连接
打开UI的`Admin->Connection`部分。 单击“ `Create`链接以创建新连接。
![https://airflow.apache.org/_images/connection_create.png](../img/635aacab53c55192ad3e31c28e65eb43.jpg)
1. 使用所需的连接ID填写`Conn Id`字段。 建议您使用小写字符和单独的带下划线的单词。
2. 使用`Conn Type`字段选择连接类型。
3. 填写其余字段。 有关属于不同连接类型的字段的说明,请参阅连接类型。
4. 单击“ `Save`按钮以创建连接。
## 使用UI编辑连接
打开UI的`Admin->Connection`部分。 单击连接列表中要编辑的连接旁边的铅笔图标。
![https://airflow.apache.org/_images/connection_edit.png](../img/08e0f3fedf871b535c850d202dda1422.jpg)
修改连接属性,然后单击“ `Save`按钮以保存更改。
## 使用环境变量创建连接
可以使用环境变量创建气流管道中的连接。 环境变量需要具有`AIRFLOW_CONN_` for Airflow的前缀, `AIRFLOW_CONN_` URI格式的值才能正确使用连接。
在引用Airflow管道中的连接时, `conn_id`应该是没有前缀的变量的名称。 例如,如果`AIRFLOW_CONN_POSTGRES_MASTER`名为`postgres_master`则环境变量应命名为`AIRFLOW_CONN_POSTGRES_MASTER` (请注意,环境变量必须全部为大写)。 Airflow假定环境变量返回的值为URI格式(例如`postgres://user:password@localhost:5432/master``s3://accesskey:secretkey@S3` )。
## 连接类型
### Google云端平台
Google Cloud Platform连接类型支持[GCP集成](https://apachecn.github.io/airflow-doc-zh/integration.html)
#### 对GCP进行身份验证
有两种方法可以使用Airflow连接到GCP。
1. 使用[应用程序默认凭据](https://google-auth.readthedocs.io/en/latest/reference/google.auth.html) ,例如在Google Compute Engine上运行时通过元数据服务器。
2. 在磁盘上使用[服务帐户](https://cloud.google.com/docs/authentication/)密钥文件(JSON格式)。
#### 默认连接ID
默认情况下使用以下连接ID。
```
bigquery_default
```
[`BigQueryHook`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.hooks.bigquery_hook.BigQueryHook")钩子使用。
```
google_cloud_datastore_default
```
[`DatastoreHook`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.hooks.datastore_hook.DatastoreHook")钩子使用。
```
google_cloud_default
```
[`GoogleCloudBaseHook`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook")[`DataFlowHook`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook")[`DataProcHook`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook")[`MLEngineHook`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook")[`GoogleCloudStorageHook`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook")挂钩使用。
#### 配置连接
```
Project Id (required)
```
要连接的Google Cloud项目ID。
```
Keyfile Path
```
磁盘上[服务帐户](https://cloud.google.com/docs/authentication/)密钥文件(JSON格式)的路径。
如果使用应用程序默认凭据则不需要
```
Keyfile JSON
```
磁盘上的[服务帐户](https://cloud.google.com/docs/authentication/)密钥文件(JSON格式)的内容。 如果使用此方法进行身份验证,建议[保护您的连接](secure-connections.html)
如果使用应用程序默认凭据则不需要
```
Scopes (comma separated)
```
要通过身份验证的逗号分隔[Google云端范围](https://developers.google.com/identity/protocols/googlescopes)列表。
注意
使用应用程序默认凭据时,将忽略范围。 请参阅[AIRFLOW-2522](https://issues.apache.org/jira/browse/AIRFLOW-2522)
\ No newline at end of file
# 保护连接
默认情况下,Airflow将在元数据数据库中以纯文本格式保存连接的密码。 在安装过程中强烈建议使用`crypto`包。 `crypto`包确实要求您的操作系统安装了libffi-dev。
如果最初未安装`crypto`软件包,您仍可以通过以下步骤为连接启用加密:
1. 安装crypto包`pip install apache-airflow[crypto]`
2. 使用下面的代码片段生成fernet_key。 fernet_key必须是base64编码的32字节密钥。
```
from cryptography.fernet import Fernet
fernet_key = Fernet . generate_key ()
print ( fernet_key . decode ()) # your fernet_key, keep it in secured place!
```
3.`airflow.cfg` fernet_key值替换为步骤2中的值。或者,可以将fernet_key存储在OS环境变量中。 在这种情况下,您不需要更改`airflow.cfg` ,因为Airflow将使用环境变量而不是`airflow.cfg`中的值:
```
# Note the double underscores
EXPORT AIRFLOW__CORE__FERNET_KEY = your_fernet_key
```
1. 重启Airflow网络服务器。
2. 对于现有连接(在安装`airflow[crypto]`和创建Fernet密钥之前已定义的连接),您需要在连接管理UI中打开每个连接,重新键入密码并保存。
\ No newline at end of file
# 写日志
## 在本地编写日志
用户可以使用`base_log_folder`设置在`airflow.cfg`指定日志文件夹。 默认情况下,它位于`AIRFLOW_HOME`目录中。
此外,用户可以提供远程位置,以便在云存储中存储日志和日志备份。
在Airflow Web UI中,本地日志优先于远程日志。 如果找不到或访问本地日志,将显示远程日志。 请注意,只有在任务完成(包括失败)后才会将日志发送到远程存储。 换句话说,运行任务的远程日志不可用。 日志作为`{dag_id}/{task_id}/{execution_date}/{try_number}.log`存储在日志文件夹中。
## 将日志写入Amazon S3
### 在你开始之前
远程日志记录使用现有的Airflow连接来读取/写入日志。 如果没有正确设置连接,则会失败。
### 启用远程日志记录
要启用此功能,必须按照此示例配置`airflow.cfg`
```
[ core ]
# Airflow can store logs remotely in AWS S3\. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
```
在上面的例子中,Airflow将尝试使用`S3Hook('MyS3Conn')`
## 将日志写入Azure Blob存储
可以将Airflow配置为在Azure Blob存储中读取和写入任务日志。 按照以下步骤启用Azure Blob存储日志记录。
1. Airflow的日志记录系统需要将自定义.py文件放在`PYTHONPATH` ,以便可以从Airflow导入。 首先创建一个存储配置文件的目录。 建议使用`$AIRFLOW_HOME/config`
2. 创建名为`$AIRFLOW_HOME/config/log_config.py``$AIRFLOW_HOME/config/__init__.py`空文件。
3.`airflow/config_templates/airflow_local_settings.py`的内容复制到刚刚在上面的步骤中创建的`log_config.py`文件中。
4. 自定义模板的以下部分:
> ```
> # wasb buckets should start with "wasb" just to help Airflow select correct handler
> REMOTE_BASE_LOG_FOLDER = 'wasb-<whatever you want here>'
>
> # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
> LOGGING_CONFIG = ...
>
> ```
5. 确保已在Airflow中定义Azure Blob存储(Wasb)连接挂钩。 挂钩应具有对`REMOTE_BASE_LOG_FOLDER`定义的Azure Blob存储桶的读写访问权限。
6. 更新`$AIRFLOW_HOME/airflow.cfg`以包含:
> ```
> remote_logging = True
> logging_config_class = log_config.LOGGING_CONFIG
> remote_log_conn_id = <name of the Azure Blob Storage connection>
>
> ```
7. 重新启动Airflow网络服务器和调度程序,并触发(或等待)新任务执行。
8. 验证日志是否显示在您定义的存储桶中新执行的任务中。
## 将日志写入Google云端存储
请按照以下步骤启用Google云端存储日志记录。
1. Airflow的日志记录系统需要将自定义.py文件放在`PYTHONPATH` ,以便可以从Airflow导入。 首先创建一个存储配置文件的目录。 建议使用`$AIRFLOW_HOME/config`
2. 创建名为`$AIRFLOW_HOME/config/log_config.py``$AIRFLOW_HOME/config/__init__.py`空文件。
3.`airflow/config_templates/airflow_local_settings.py`的内容复制到刚刚在上面的步骤中创建的`log_config.py`文件中。
4. 自定义模板的以下部分:
> ```
> # Add this variable to the top of the file. Note the trailing slash.
> GCS_LOG_FOLDER = 'gs://<bucket where logs should be persisted>/'
>
> # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
> LOGGING_CONFIG = ...
>
> # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
> 'gcs.task' : {
> 'class' : 'airflow.utils.log.gcs_task_handler.GCSTaskHandler' ,
> 'formatter' : 'airflow.task' ,
> 'base_log_folder' : os.path.expanduser ( BASE_LOG_FOLDER ) ,
> 'gcs_log_folder' : GCS_LOG_FOLDER,
> 'filename_template' : FILENAME_TEMPLATE,
> } ,
>
> # Update the airflow.task and airflow.task_runner blocks to be 'gcs.task' instead of 'file.task'.
> 'loggers' : {
> 'airflow.task' : {
> 'handlers' : [ 'gcs.task' ] ,
> ...
> } ,
> 'airflow.task_runner' : {
> 'handlers' : [ 'gcs.task' ] ,
> ...
> } ,
> 'airflow' : {
> 'handlers' : [ 'console' ] ,
> ...
> } ,
> }
>
> ```
5. 确保已在Airflow中定义了Google Cloud Platform连接挂钩。 该挂钩应具有对`GCS_LOG_FOLDER`定义的Google Cloud Storage存储桶的读写访问权限。
6. 更新`$AIRFLOW_HOME/airflow.cfg`以包含:
> ```
> task_log_reader = gcs.task
> logging_config_class = log_config.LOGGING_CONFIG
> remote_log_conn_id = <name of the Google cloud platform hook>
>
> ```
7. 重新启动Airflow网络服务器和调度程序,并触发(或等待)新任务执行。
8. 验证日志是否显示在您定义的存储桶中新执行的任务中。
9. 确认Google Cloud Storage查看器在U​​I中正常运行。 拉出新执行的任务,并验证您是否看到类似的内容:
> ```
> *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
> [ 2017 -10-03 21 :57:50,056 ] { cli.py:377 } INFO - Running on host chrisr-00532
> [ 2017 -10-03 21 :57:50,093 ] { base_task_runner.py:115 } INFO - Running: [ 'bash' , '-c' , u 'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py' ]
> [ 2017 -10-03 21 :57:51,264 ] { base_task_runner.py:98 } INFO - Subtask: [ 2017 -10-03 21 :57:51,263 ] { __init__.py:45 } INFO - Using executor SequentialExecutor
> [ 2017 -10-03 21 :57:51,306 ] { base_task_runner.py:98 } INFO - Subtask: [ 2017 -10-03 21 :57:51,306 ] { models.py:186 } INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
>
> ```
请注意它从远程日志文件中读取的顶行。
请注意,如果您使用旧式airflow.cfg配置方法将日志保存到Google云端存储,则旧的日志将不再在Airflow用户界面中显示,但它们仍将存在于Google云端存储中。 这是一个向后无比的变化。 如果您对此不满意,可以更改`FILENAME_TEMPLATE`以反映旧式日志文件名格式。
\ No newline at end of file
# 用Celery扩大规模
`CeleryExecutor`是您扩展工人数量的方法之一。 为此,您需要设置Celery后端( **RabbitMQ****Redis** ,...)并更改`airflow.cfg`以将执行程序参数指向`CeleryExecutor`并提供相关的Celery设置。
有关设置Celery代理的更多信息,请参阅[有关该主题](http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html)的详尽[Celery文档](http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html)
以下是您的员工的一些必要要求:
* 需要安装airflow,CLI需要在路径中
* 整个群集中的气流配置设置应该是同构的
* 在worker上执行的操作符需要在该上下文中满足其依赖项。 例如,如果您使用`HiveOperator` ,则需要在该框上安装hive CLI,或者如果您使用`MySqlOperator` ,则必须以某种方式在`PYTHONPATH`提供所需的Python库
* 工作人员需要访问其`DAGS_FOLDER` ,您需要通过自己的方式同步文件系统。 常见的设置是将DAGS_FOLDER存储在Git存储库中,并使用Chef,Puppet,Ansible或用于配置环境中的计算机的任何内容在计算机之间进行同步。 如果您的所有盒子都有一个共同的挂载点,那么共享您的管道文件也应该可以正常工作
要启动工作人员,您需要设置Airflow并启动worker子命令
```
airflow worker
```
你的工人一旦被解雇就应该开始接收任务。
请注意,您还可以运行“Celery Flower”,这是一个建立在Celery之上的Web UI,用于监控您的员工。 您可以使用快捷命令`airflow flower`启动Flower Web服务器。
一些警告:
* 确保使用数据库支持的结果后端
* 确保在[celery_broker_transport_options]中设置超过最长运行任务的ETA的可见性超时
* 任务可以并消耗资源,确保您的工作人员有足够的资源来运行<cite>worker_concurrency</cite>任务
\ No newline at end of file
# 用Dask扩展
`DaskExecutor`允许您在Dask分布式群集中运行Airflow任务。
Dask集群可以在单个机器上运行,也可以在远程网络上运行。 有关完整详细信息,请参阅[分布式文档](https://distributed.readthedocs.io/)
要创建集群,首先启动调度程序:
```
# default settings for a local cluster
DASK_HOST = 127 .0.0.1
DASK_PORT = 8786
dask-scheduler --host $DASK_HOST --port $DASK_PORT
```
接下来,在任何可以连接到主机的计算机上启动至少一个Worker:
```
dask-worker $DASK_HOST : $DASK_PORT
```
编辑`airflow.cfg`以将执行程序设置为`DaskExecutor`并在`[dask]`部分中提供Dask Scheduler地址。
请注意:
* 每个Dask工作者必须能够导入Airflow和您需要的任何依赖项。
* Dask不支持队列。 如果使用队列创建了Airflow任务,则会引发警告,但该任务将提交给集群。
\ No newline at end of file
# 使用Mesos扩展(社区贡献)
有两种方法可以将气流作为mesos框架运行:
1. 直接在mesos从站上运行气流任务,要求每个mesos从站安装和配置气流。
2. 在安装了气流的docker容器内运行气流任务,该容器在mesos slave上运行。
## 任务直接在mesos从站上执行
`MesosExecutor`允许您在Mesos群集上安排气流任务。 为此,您需要一个正在运行的mesos集群,并且必须执行以下步骤 -
1. 在将运行Web服务器和调度程序的mesos从站上安装气流,让我们将其称为“Airflow服务器”。
2. 在Airflow服务器上,从[mesos下载](http://open.mesosphere.com/downloads/mesos/)安装mesos python eggs。
3. 在Airflow服务器上,使用可从所有mesos从站访问的数据库(例如mysql)并在`airflow.cfg`添加配置。
4.`airflow.cfg`更改为指向`airflow.cfg`的point executor参数,并提供相关的Mesos设置。
5. 在所有mesos奴隶上,安装气流。 从Airflow服务器复制`airflow.cfg` (以便它使用相同的sql炼金术连接)。
6. 在所有mesos从服务器上,运行以下服务日志:
```
airflow serve_logs
```
1. 在Airflow服务器上,要开始在mesos上处理/调度DAG,请运行:
```
airflow scheduler -p
```
注意:我们需要-p参数来挑选DAG。
您现在可以在mesos UI中查看气流框架和相应的任务。 气流任务的日志可以像往常一样在气流UI中查看。
有关mesos的更多信息,请参阅[mesos文档](http://mesos.apache.org/documentation/latest/) 。 有关<cite>MesosExecutor的</cite>任何疑问/错误,请联系[@ kapil-malik](https://github.com/kapil-malik)
## 在mesos从站上的容器中执行的任务
[此要点](https://gist.github.com/sebradloff/f158874e615bda0005c6f4577b20036e)包含实现以下所需的所有文件和配置更改:
1. 使用安装了mesos python鸡蛋创建一个dockerized版本的气流。
> 我们建议利用docker的多阶段构建来实现这一目标。 我们有一个Dockerfile定义从源(Dockerfile-mesos)构建特定版本的mesos,以便创建python egg。 在气流Dockerfile(Dockerfile-airflow)中,我们从mesos图像中复制python eggs。
1.`airflow.cfg`创建一个mesos配置块。
> 配置块保持与默认气流配置(default_airflow.cfg)相同,但添加了一个选项`docker_image_slave` 。 这应该设置为您希望mesos在运行气流任务时使用的图像的名称。 确保您具有适用于您的mesos主服务器的DNS记录的正确配置以及任何类型的授权(如果存在)。
1. 更改`airflow.cfg`以将执行程序参数指向<cite>MesosExecutor</cite><cite>executor = SequentialExecutor</cite> )。
2. 确保您的mesos slave可以访问您用于`docker_image_slave`存储库。
> [mesos文档中提供了相关说明。](https://mesos.readthedocs.io/en/latest/docker-containerizer/)
其余部分取决于您以及您希望如何使用dockerized气流配置。
\ No newline at end of file
# 使用systemd运行Airflow
Airflow可以与基于系统的系统集成。 这使得观察您的守护进程变得容易,因为systemd可以在失败时重新启动守护进程。 在`scripts/systemd`目录中,您可以找到已在基于Redhat的系统上测试过的单元文件。 您可以将它们复制到`/usr/lib/systemd/system` 。 假设Airflow将在`airflow:airflow`下运行`airflow:airflow` 。 如果不是(或者如果您在非基于Redhat的系统上运行),则可能需要调整单元文件。
`/etc/sysconfig/airflow`获取环境配置。 提供了一个示例文件。 运行调度程序时,请确保在此文件中指定`SCHEDULER_RUNS`变量。 您也可以在此处定义,例如`AIRFLOW_HOME``AIRFLOW_CONFIG`
\ No newline at end of file
# 用upstart运行Airflow
Airflow可以与基于upstart的系统集成。 Upstart会在系统启动时自动启动`/etc/init`具有相应`*.conf`文件的所有气流服务。 失败时,upstart会自动重启进程(直到达到`*.conf`文件中设置的重新生成限制)。
您可以在`scripts/upstart`目录中找到示例新贵作业文件。 这些文件已在Ubuntu 14.04 LTS上测试过。 您可能需要调整`start on``stop on`节,以使其适用于其他新兴系统。 `scripts/upstart/README`中列出了一些可能的选项。
根据需要修改`*.conf`文件并复制到`/etc/init`目录。 假设气流将在`airflow:airflow``airflow:airflow` 。 如果您使用其他用户/组,请在`*.conf`文件中更改`setuid``setgid`
您可以使用`initctl`手动启动,停止,查看已与新贵集成的气流过程的状态
```
initctl airflow-webserver status
```
\ No newline at end of file
# 使用测试模式配置
Airflow具有一组固定的“测试模式”配置选项。 您可以随时通过调用`airflow.configuration.load_test_config()`来加载它们(注意此操作不可逆!)。 但是,在您有机会调用load_test_config()之前,会加载一些选项(如DAG_FOLDER)。 为了急切加载测试配置,请在airflow.cfg中设置test_mode:
```
[ tests ]
unit_test_mode = True
```
由于Airflow的自动环境变量扩展(请参阅[设置配置选项](set-config.html) ),您还可以设置env var `AIRFLOW__CORE__UNIT_TEST_MODE`以临时覆盖airflow.cfg。
\ No newline at end of file
# UI /截图
通过Airflow UI,您可以轻松监控数据管道并对其进行故障排除。 以下是您可以在Airflow UI中找到的一些功能和可视化的快速概述。
## DAGs查看
您环境中的DAG列表,以及一组有用页面的快捷方式。 您可以一目了然地查看成功,失败或当前正在运行的任务数量。
* * *
![https://airflow.apache.org/_images/dags.png](../img/31a64f6b60a7f97f88c4b557992d0f14.jpg)
* * *
## 树视图
跨越时间的DAG的树表示。 如果管道延迟,您可以快速查看不同步骤的位置并识别阻塞步骤。
* * *
![https://airflow.apache.org/_images/tree.png](../img/ad4ba22a6a3d5668fc19e0461f82e192.jpg)
* * *
## 图表视图
图表视图可能是最全面的。 可视化DAG的依赖关系及其特定运行的当前状态。
* * *
![https://airflow.apache.org/_images/graph.png](../img/bc05701b0ed4f5347e26c06452e8fd76.jpg)
* * *
## 变量视图
变量视图允许您列出,创建,编辑或删除作业期间使用的变量的键值对。 如果密钥默认包含('password','secret','passwd','authorization','api_key','apikey','access_token')中的任何单词,则隐藏变量的值,但可以配置以明文显示。
* * *
![https://airflow.apache.org/_images/variable_hidden.png](../img/9bf73cf3f89f830e70f800145ab51b10.jpg)
* * *
## 甘特图
甘特图可让您分析任务持续时间和重叠。 您可以快速识别瓶颈以及大部分时间用于特定DAG运行的位置。
* * *
![https://airflow.apache.org/_images/gantt.png](../img/cfaa010349b1e40164cabb36c3b7dc1b.jpg)
* * *
## 任务持续时间
过去N次运行的不同任务的持续时间。 通过此视图,您可以查找异常值并快速了解DAG在多次运行中花费的时间。
* * *
![https://airflow.apache.org/_images/duration.png](../img/f0781c3598679db6605d7dfffc65c6a9.jpg)
* * *
## 代码视图
透明度就是一切。 虽然您的管道代码在源代码管理中,但这是一种快速获取生成DAG的代码并提供更多上下文的方法。
* * *
![https://airflow.apache.org/_images/code.png](../img/b732d0bdc5c1a35f3ef34cc2d14b5199.jpg)
* * *
## 任务实例上下文菜单
从上面的页面(树视图,图形视图,甘特图......)中,始终可以单击任务实例,并进入此丰富的上下文菜单,该菜单可以将您带到更详细的元数据并执行某些操作。
* * *
![https://airflow.apache.org/_images/context.png](../img/c6288f9767ec25b7660ae86679773f69.jpg)
\ No newline at end of file
# 执照
[![https://airflow.apache.org/_images/apache.jpg](../img/499e29d5e76bf2bc6edb08291ec11080.jpg)](https://airflow.apache.org/_images/apache.jpg)
```
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
```
\ No newline at end of file
此差异已折叠。
# 数据分析
使用数据生产效率的一部分是拥有正确的武器来分析您正在使用的数据。 Airflow提供了一个简单的查询界面来编写SQL并快速获得结果,以及一个图表应用程序,可以让您可视化数据。
## 临时查询
adhoc查询UI允许与Airflow中注册的数据库连接进行简单的SQL交互。
![https://airflow.apache.org/_images/adhoc.png](../img/bfbf60f9689630d6aa1f46aeab1e6cf0.jpg)
## 图表
基于flask-admin和highcharts构建的简单UI允许轻松构建数据可视化和图表。 使用标签,SQL,图表类型填写表单,从环境的连接中选择源数据库,选择其他一些选项,然后保存以供以后使用。
在编写气流管道,参数化查询和直接在URL中修改参数时,您甚至可以使用相同的模板和宏。
这些图表是基本的,但它们很容易创建,修改和共享。
### 图表截图
![https://airflow.apache.org/_images/chart.png](../img/a7247daabfaa0606cbb0d05e511194db.jpg)
* * *
### 图表表格截图
![https://airflow.apache.org/_images/chart_form.png](../img/a40de0ada10bc0250de4b6c082cb7660.jpg)
\ No newline at end of file
此差异已折叠。
# 调度和触发器
Airflow调度程序监视所有任务和所有DAG,并触发已满足其依赖关系的任务实例。 在幕后,它监视并与其可能包含的所有DAG对象的文件夹保持同步,并定期(每分钟左右)检查活动任务以查看是否可以触发它们。
Airflow调度程序旨在作为Airflow生产环境中的持久服务运行。 要开始,您需要做的就是执行`airflow scheduler` 。 它将使用`airflow.cfg`指定的配置。
请注意,如果您在一天的`schedule_interval`上运行DAG,则会在`2016-01-01T23:59`之后不久触发标记为`2016-01-01`的运行。 换句话说,作业实例在其覆盖的时间段结束后启动。
**让我们重复一遍**调度`schedule_interval`在开始日期之后,在句点结束时运行您的作业一个`schedule_interval`
调度程序启动`airflow.cfg`指定的执行程序的实例。 如果碰巧是`LocalExecutor` ,任务将作为子`LocalExecutor`执行; 在`CeleryExecutor``MesosExecutor`的情况下,任务是远程执行的。
要启动调度程序,只需运行以下命令:
```
airflow scheduler
```
## DAG运行
DAG Run是一个表示DAG实例化的对象。
每个DAG可能有也可能没有时间表,通知如何创建`DAG Runs``schedule_interval`被定义为DAG参数,并且优选地接收作为`str`[cron表达式](https://en.wikipedia.org/wiki/Cron)`datetime.timedelta`对象。 或者,您也可以使用其中一个cron“预设”:
<colgroup><col width="15%"><col width="69%"><col width="16%"></colgroup>
| 预置 | 含义 | cron的 |
| --- | --- | --- |
| `None` | 不要安排,专门用于“外部触发”的DAG | |
| `@once` | 安排一次,只安排一次 | |
| `@hourly` | 在小时开始时每小时运行一次 | `0 * * * *` |
| `@daily` | 午夜一天运行一次 | `0 0 * * *` |
| `@weekly` | 周日早上每周午夜运行一次 | `0 0 * * 0` |
| `@monthly` | 每个月的第一天午夜运行一次 | `0 0 1 * *` |
| `@yearly` | 每年1月1日午夜运行一次 | `0 0 1 1 *` |
您的DAG将针对每个计划进行实例化,同时为每个计划创建`DAG Run`条目。
DAG运行具有与它们相关联的状态(运行,失败,成功),并通知调度程序应该针对任务提交评估哪组调度。 如果没有DAG运行级别的元数据,Airflow调度程序将需要做更多的工作才能确定应该触发哪些任务并进行爬行。 在更改DAG的形状时,也可能会添加新任务,从而创建不需要的处理。
## 回填和追赶
具有`start_date` (可能是`end_date` )和`schedule_interval`的Airflow DAG定义了一系列间隔,调度程序将这些间隔转换为单独的Dag运行并执行。 Airflow的一个关键功能是这些DAG运行是原子的幂等项,默认情况下,调度程序将检查DAG的生命周期(从开始到结束/现在,一次一个间隔)并启动DAG运行对于尚未运行(或已被清除)的任何间隔。 这个概念叫做Catchup。
如果你的DAG被编写来处理它自己的追赶(IE不仅限于间隔,而是改为“现在”。),那么你将需要关闭追赶(在DAG本身上使用`dag.catchup = False` )或者默认情况下在配置文件级别使用`catchup_by_default = False` 。 这样做,是指示调度程序仅为DAG间隔序列的最新实例创建DAG运行。
```
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 12 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
'schedule_interval' : '@hourly' ,
}
dag = DAG ( 'tutorial' , catchup = False , default_args = default_args )
```
在上面的示例中,如果调度程序守护程序在2016-01-02上午6点(或从命令行)拾取DAG,则将创建单个DAG运行,其`execution_date`为2016-01-01 ,下一个将在2016-01-03上午午夜后创建,执行日期为2016-01-02。
如果`dag.catchup`值为True,则调度程序将为2015-12-01和2016-01-02之间的每个完成时间间隔创建一个DAG Run(但不是2016-01-02中的一个,因为该时间间隔)尚未完成)并且调度程序将按顺序执行它们。 对于可以轻松拆分为句点的原子数据集,此行为非常有用。 如果您的DAG运行在内部执行回填,则关闭追赶是很好的。
## 外部触发器
请注意,在运行`airflow trigger_dag`命令时,也可以通过CLI手动创建`DAG Runs` ,您可以在其中定义特定的`run_id` 。 在调度程序外部创建的`DAG Runs`与触发器的时间戳相关联,并将与预定的`DAG runs`一起显示在UI中。
此外,您还可以使用Web UI手动触发`DAG Run` (选项卡“DAG” - &gt;列“链接” - &gt;按钮“触发器Dag”)。
## 要牢记
* 第一个`DAG Run`是基于DAG中任务的最小`start_date`创建的。
* 后续`DAG Runs`由调度程序进程根据您的DAG的`schedule_interval`顺序创建。
* 当清除一组任务的状态以期让它们重新运行时,重要的是要记住`DAG Run`的状态,因为它定义了调度程序是否应该查看该运行的触发任务。
以下是一些可以**取消阻止任务的方法**
* 在UI中,您可以从任务实例对话框中**清除** (如删除状态)各个任务实例,同时定义是否要包括过去/未来和上游/下游依赖项。 请注意,接下来会出现一个确认窗口,您可以看到要清除的设置。 您还可以清除与dag关联的所有任务实例。
* CLI命令`airflow clear -h`在清除任务实例状态时有很多选项,包括指定日期范围,通过指定正则表达式定位task_ids,包含上游和下游亲属的标志,以及特定状态下的目标任务实例( `failed``success`
* 清除任务实例将不再删除任务实例记录。 相反,它更新max_tries并将当前任务实例状态设置为None。
* 将任务实例标记为失败可以通过UI完成。 这可用于停止运行任务实例。
* 将任务实例标记为成功可以通过UI完成。 这主要是为了修复漏报,或者例如在Airflow之外应用修复时。
* `airflow backfill` CLI子命令具有`--mark_success`标志,允许选择DAG的子部分以及指定日期范围。
\ No newline at end of file
# 插件
Airflow内置了一个简单的插件管理器,可以通过简单地删除`$AIRFLOW_HOME/plugins`文件夹中的文件,将外部功能集成到其核心。
`plugins`文件夹中的python模块将被导入, **钩子****操作符****传感器****宏****执行器**和Web **视图**将集成到Airflow的主要集合中,并可供使用。
## 做什么的?
Airflow提供了一个用于处理数据的通用工具箱。 不同的组织有不同的堆栈和不同的需求。 使用Airflow插件可以让公司定制他们的Airflow安装以反映他们的生态系统。
插件可以用作编写,共享和激活新功能集的简便方法。
还需要一组更复杂的应用程序来与不同风格的数据和元数据进行交互。
例子:
* 一组用于解析Hive日志和公开Hive元数据(CPU / IO /阶段/倾斜/ ...)的工具
* 异常检测框架,允许人们收集指标,设置阈值和警报
* 审计工具,帮助了解谁访问了什么
* 配置驱动的SLA监控工具,允许您设置受监控的表以及应该在何时着陆,提醒人员并公开停机的可视化
* ...
## 为什么要建立在Airflow之上?
Airflow有许多组件可以在构建应用程序时重用:
* 可用于呈现视图的Web服务器
* 用于存储模型的元数据数据库
* 访问您的数据库,以及如何连接到它们的知识
* 应用程序可以将工作负载推送到的一组工作者
* 部署了Airflow,您可以专注于部署物流
* 基本的图表功能,底层库和抽象
## 接口
要创建插件,您需要派生`airflow.plugins_manager.AirflowPlugin`类并引用要插入Airflow的对象。 以下是您需要派生的类看起来像:
```
class AirflowPlugin ( object ):
# The name of your plugin (str)
name = None
# A list of class(es) derived from BaseOperator
operators = []
# A list of class(es) derived from BaseSensorOperator
sensors = []
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
```
您可以通过继承派生它(请参阅下面的示例)。 请注意,必须指定此类中的`name`
将插件导入Airflow后,您可以使用类似语句调用它
```
from airflow. { type , like "operators" , "sensors" } . { name specificed inside the plugin class } import *
```
当您编写自己的插件时,请确保您理解它们。 每种类型的插件都有一些基本属性。 例如,
* 对于`Operator`插件,必须使用`execute`方法。
* 对于`Sensor`插件,必须使用返回布尔值的`poke`方法。
## 例
下面的代码定义了一个插件,它在Airflow中注入一组虚拟对象定义。
```
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
from flask_admin import BaseView , expose
from flask_admin.base import MenuLink
# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.executors.base_executor import BaseExecutor
# Will show up under airflow.hooks.test_plugin.PluginHook
class PluginHook ( BaseHook ):
pass
# Will show up under airflow.operators.test_plugin.PluginOperator
class PluginOperator ( BaseOperator ):
pass
# Will show up under airflow.sensors.test_plugin.PluginSensorOperator
class PluginSensorOperator ( BaseSensorOperator ):
pass
# Will show up under airflow.executors.test_plugin.PluginExecutor
class PluginExecutor ( BaseExecutor ):
pass
# Will show up under airflow.macros.test_plugin.plugin_macro
def plugin_macro ():
pass
# Creating a flask admin BaseView
class TestView ( BaseView ):
@expose ( '/' )
def test ( self ):
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
return self . render ( "test_plugin/test.html" , content = "Hello galaxy!" )
v = TestView ( category = "Test Plugin" , name = "Test View" )
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint (
"test_plugin" , __name__ ,
template_folder = 'templates' , # registers airflow/plugins/templates as a Jinja template folder
static_folder = 'static' ,
static_url_path = '/static/test_plugin' )
ml = MenuLink (
category = 'Test Plugin' ,
name = 'Test Menu Link' ,
url = 'https://airflow.incubator.apache.org/' )
# Defining the plugin class
class AirflowTestPlugin ( AirflowPlugin ):
name = "test_plugin"
operators = [ PluginOperator ]
sensors = [ PluginSensorOperator ]
hooks = [ PluginHook ]
executors = [ PluginExecutor ]
macros = [ plugin_macro ]
admin_views = [ v ]
flask_blueprints = [ bp ]
menu_links = [ ml ]
```
\ No newline at end of file
# 安全
默认情况下,所有门都打开。 限制对Web应用程序的访问的一种简单方法是在网络级别或使用SSH隧道执行此操作。
但是,可以通过使用其中一个提供的后端或创建自己的后端来打开身份验证。
请务必查看[Experimental Rest API](api.html)以保护API。
## Web身份验证
### 密码
最简单的身份验证机制之一是要求用户在登录前指定密码。密码身份验证需要在需求文件中使用`password`子包。 密码哈希在存储密码之前使用bcrypt。
```
[ webserver ]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
```
启用密码身份验证后,需要先创建初始用户凭据,然后才能登录任何人。 未在此身份验证后端的迁移中创建初始用户,以防止默认Airflow安装受到攻击。 必须通过安装Airflow的同一台机器上的Python REPL创建新用户。
```
# navigate to the airflow installation directory
$ cd ~/airflow
$ python
Python 2 .7.9 ( default, Feb 10 2015 , 03 :28:08 )
Type "help" , "copyright" , "credits" or "license" for more information.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser ( models.User ())
>>> user.username = 'new_user_name'
>>> user.email = 'new_user_email@example.com'
>>> user.password = 'set_the_password'
>>> session = settings.Session ()
>>> session.add ( user )
>>> session.commit ()
>>> session.close ()
>>> exit ()
```
### LDAP
要打开LDAP身份验证,请按如下方式配置`airflow.cfg` 。 请注意,该示例使用与ldap服务器的加密连接,因为您可能不希望密码在网络级别上可读。 但是,如果您真的想要,可以在不加密的情况下进行配置。
此外,如果您使用的是Active Directory,并且没有明确指定用户所在的OU,则需要将`search_scope`更改为“SUBTREE”。
有效的search_scope选项可以在[ldap3文档中](http://ldap3.readthedocs.org/searches.html%3Fhighlight%3Dsearch_scope)找到
```
[ webserver ]
authenticate = True
auth_backend = airflow.contrib.auth.backends.ldap_auth
[ ldap ]
# set a connection without encryption: uri = ldap://<your.ldap.server>:<port>
uri = ldaps://<your.ldap.server>:<port>
user_filter = objectClass = *
# in case of Active Directory you would use: user_name_attr = sAMAccountName
user_name_attr = uid
# group_member_attr should be set accordingly with *_filter
# eg :
# group_member_attr = groupMembership
# superuser_filter = groupMembership=CN=airflow-super-users...
group_member_attr = memberOf
superuser_filter = memberOf = CN = airflow-super-users,OU = Groups,OU = RWC,OU = US,OU = NORAM,DC = example,DC = com
data_profiler_filter = memberOf = CN = airflow-data-profilers,OU = Groups,OU = RWC,OU = US,OU = NORAM,DC = example,DC = com
bind_user = cn = Manager,dc = example,dc = com
bind_password = insecure
basedn = dc = example,dc = com
cacert = /etc/ca/ldap_ca.crt
# Set search_scope to one of them: BASE, LEVEL , SUBTREE
# Set search_scope to SUBTREE if using Active Directory, and not specifying an Organizational Unit
search_scope = LEVEL
```
superuser_filter和data_profiler_filter是可选的。 如果已定义,则这些配置允许您指定用户必须属于的LDAP组,以便拥有超级用户(admin)和数据分析器权限。 如果未定义,则所有用户都将成为超级用户和数据分析器。
### 滚动你自己
Airflow使用`flask_login`并在`airflow.default_login`模块中公开一组挂钩。 您可以更改内容并使其成为`PYTHONPATH`一部分,并将其配置为`airflow.cfg`的后端。
```
[ webserver ]
authenticate = True
auth_backend = mypackage.auth
```
## 多租户
通过在配置中设置`webserver:filter_by_owner` ,可以在启用身份验证时按所有者名称筛选`webserver:filter_by_owner` 。 有了这个,用户将只看到它所有者的dags,除非它是超级用户。
```
[ webserver ]
filter_by_owner = True
```
## Kerberos的
Airflow最初支持Kerberos。 这意味着气流可以为自己更新kerberos票证并将其存储在票证缓存中。 钩子和匕首可以使用票证来验证kerberized服务。
### 限制
请注意,此时并未调整所有挂钩以使用此功能。 此外,它没有将kerberos集成到Web界面中,您现在必须依赖网络级安全性来确保您的服务保持安全。
Celery集成尚未经过试用和测试。 但是,如果您为每个主机生成一个密钥选项卡,并在每个工作人员旁边启动一个故障单续订器,那么它很可能会起作用。
### 启用kerberos
#### 空气流动
要启用kerberos,您需要生成(服务)密钥选项卡。
```
# in the kadmin.local or kadmin shell, create the airflow principal
kadmin: addprinc -randkey airflow/fully.qualified.domain.name@YOUR-REALM.COM
# Create the airflow keytab file that will contain the airflow principal
kadmin: xst -norandkey -k airflow.keytab airflow/fully.qualified.domain.name
```
现在将此文件存储在气流用户可以读取的位置(chmod 600)。 然后将以下内容添加到`airflow.cfg`
```
[ core ]
security = kerberos
[ kerberos ]
keytab = /etc/airflow/airflow.keytab
reinit_frequency = 3600
principal = airflow
```
启动票证续订
```
# run ticket renewer
airflow kerberos
```
#### Hadoop的
如果要使用模拟,则需要在hadoop配置的`core-site.xml`中启用。
```
<property>
<name>hadoop.proxyuser.airflow.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.airflow.users</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.airflow.hosts</name>
<value>*</value>
</property>
```
当然,如果您需要加强安全性,请用更合适的东西替换星号。
### 使用kerberos身份验证
已更新配置单元挂钩以利用kerberos身份验证。 要允许DAG使用它,只需更新连接详细信息,例如:
```
{ "use_beeline" : true, "principal" : "hive/_HOST@EXAMPLE.COM" }
```
根据您的设置调整主体。 _HOST部分将替换为服务器的完全限定域名。
您可以指定是否要将dag所有者用作连接的用户或连接的登录部分中指定的用户。 对于登录用户,请将以下内容指定为额外:
```
{ "use_beeline" : true, "principal" : "hive/_HOST@EXAMPLE.COM" , "proxy_user" : "login" }
```
对于DAG所有者使用:
```
{ "use_beeline" : true, "principal" : "hive/_HOST@EXAMPLE.COM" , "proxy_user" : "owner" }
```
在DAG中,初始化HiveOperator时,请指定:
```
run_as_owner = True
```
## OAuth身份验证
### GitHub Enterprise(GHE)身份验证
GitHub Enterprise身份验证后端可用于对使用OAuth2安装GitHub Enterprise的用户进行身份验证。 您可以选择指定团队白名单(由slug cased团队名称组成)以限制仅登录这些团队的成员。
```
[ webserver ]
authenticate = True
auth_backend = airflow.contrib.auth.backends.github_enterprise_auth
[ github_enterprise ]
host = github.example.com
client_id = oauth_key_from_github_enterprise
client_secret = oauth_secret_from_github_enterprise
oauth_callback_route = /example/ghe_oauth/callback
allowed_teams = 1 , 345 , 23
```
注意
如果您未指定团队白名单,那么在GHE安装中拥有有效帐户的任何人都可以登录Airflow。
#### 设置GHE身份验证
必须先在GHE中设置应用程序,然后才能使用GHE身份验证后端。 要设置应用程序:
1. 导航到您的GHE配置文件
2. 从左侧导航栏中选择“应用程序”
3. 选择“开发者应用程序”选项卡
4. 点击“注册新申请”
5. 填写所需信息(“授权回调URL”必须完全合格,例如[http://airflow.example.com/example/ghe_oauth/callback](http://airflow.example.com/example/ghe_oauth/callback)
6. 点击“注册申请”
7. 根据上面的示例,将“客户端ID”,“客户端密钥”和回调路由复制到airflow.cfg
#### 在github.com上使用GHE身份验证
可以在github.com上使用GHE身份验证:
1. [创建一个Oauth应用程序](https://developer.github.com/apps/building-oauth-apps/creating-an-oauth-app/)
2. 根据上面的示例,将“客户端ID”,“客户端密钥”复制到airflow.cfg
3.`host = github.com`设置`host = github.com``oauth_callback_route = /oauth/callback`
### Google身份验证
Google身份验证后端可用于使用OAuth2对Google用户进行身份验证。 您必须指定电子邮件域以限制登录(以逗号分隔),仅限于这些域的成员。
```
[ webserver ]
authenticate = True
auth_backend = airflow.contrib.auth.backends.google_auth
[ google ]
client_id = google_client_id
client_secret = google_client_secret
oauth_callback_route = /oauth2callback
domain = "example1.com,example2.com"
```
#### 设置Google身份验证
必须先在Google API控制台中设置应用程序,然后才能使用Google身份验证后端。 要设置应用程序:
1. 导航到[https://console.developers.google.com/apis/](https://console.developers.google.com/apis/)
2. 从左侧导航栏中选择“凭据”
3. 点击“创建凭据”,然后选择“OAuth客户端ID”
4. 选择“Web应用程序”
5. 填写所需信息('授权重定向URI'必须完全合格,例如[http://airflow.example.com/oauth2callback](http://airflow.example.com/oauth2callback)
6. 点击“创建”
7. 根据上面的示例,将“客户端ID”,“客户端密钥”和重定向URI复制到airflow.cfg
## SSL
可以通过提供证书和密钥来启用SSL。 启用后,请务必在浏览器中使用“ [https://](https:) ”。
```
[ webserver ]
web_server_ssl_cert = <path to cert>
web_server_ssl_key = <path to key>
```
启用S​​SL不会自动更改Web服务器端口。 如果要使用标准端口443,则还需要配置它。 请注意,侦听端口443需要超级用户权限(或Linux上的cap_net_bind_service)。
```
# Optionally, set the server to listen on the standard SSL port.
web_server_port = 443
base_url = http://<hostname or IP>:443
```
使用SSL启用CeleryExecutor。 确保正确生成客户端和服务器证书和密钥。
```
[ celery ]
CELERY_SSL_ACTIVE = True
CELERY_SSL_KEY = <path to key>
CELERY_SSL_CERT = <path to cert>
CELERY_SSL_CACERT = <path to cacert>
```
## 模拟
Airflow能够在运行任务实例时模拟unix用户,该任务实例基于任务的`run_as_user`参数,该参数采用用户的名称。
**注意:**要模拟工作,必须使用&lt;cite&gt;sudo&lt;/cite&gt;运行Airflow,因为使用&lt;cite&gt;sudo -u&lt;/cite&gt;运行子任务并更改文件的权限。 此外,unix用户需要存在于worker上。 这是一个简单的sudoers文件条目可能看起来像这样,假设气流作为&lt;cite&gt;气流&lt;/cite&gt;用户运行。 请注意,这意味着必须以与root用户相同的方式信任和处理气流用户。
```
airflow ALL=(ALL) NOPASSWD: ALL
```
带模拟的子任务仍将记录到同一文件夹,但他们登录的文件将更改权限,只有unix用户才能写入。
### 默认模拟
要防止不使用模拟的任务以&lt;cite&gt;sudo&lt;/cite&gt;权限运行,可以设置`core:default_impersonation` config,如果未设置&lt;cite&gt;run_as_user,&lt;/cite&gt;则设置默认用户模拟。
```
[ core ]
default_impersonation = airflow
```
\ No newline at end of file
# 时区
默认情况下启用对时区的支持。 Airflow在内部和数据库中以UTC格式存储日期时间信息。 它允许您使用时区相关的计划运行DAG。 目前,Airflow不会将其转换为用户界面中的最终用户时区。 它始终以UTC显示。 此外,操作符中使用的模板也不会被转换。 时区信息是暴露出来的,由DAG的作者负责。
如果您的用户居住在多个时区,并且您希望根据每个用户的挂钟显示日期时间信息,这将非常方便。
即使您只在一个时区运行Airflow,在数据库中以UTC格式存储数据仍然是一种很好的做法(在Airflow成为时区之前也是如此,这也是建议的甚至是必需的设置)。 主要原因是夏令时(DST)。 许多国家都有DST系统,其中时钟在春季向前移动,在秋季向后移动。 如果您在当地工作,那么当转换发生时,您可能每年会遇到两次错误。 (钟摆和pytz文档更详细地讨论了这些问题。)这对于简单的DAG可能无关紧要,但如果您处于金融服务中,那么这是一个问题,在这些金融服务中您可以满足最后期限。
时区在&lt;cite&gt;airflow.cfg中&lt;/cite&gt;设置。 默认情况下,它设置为utc,但您将其更改为使用系统设置或任意IANA时区,例如&lt;cite&gt;Europe / Amsterdam&lt;/cite&gt; 。 它取决于&lt;cite&gt;钟摆&lt;/cite&gt; ,它比&lt;cite&gt;pytz&lt;/cite&gt;更准确。 安装Airflow时会安装Pendulum。
请注意,Web UI目前仅以UTC格式运行。
## 概念
### 天真并了解日期时间对象
Python的datetime.datetime对象具有tzinfo属性,可用于存储时区信息,表示为datetime.tzinfo的子类的实例。 设置此属性并描述偏移量时,可以识别日期时间对象。 否则,这是天真的。
您可以使用timezone.is_aware()和timezone.is_naive()来确定日期时间是否知晓或天真。
因为Airflow使用时区感知日期时间对象。 如果您的代码创建了datetime对象,那么他们也需要注意。
```
from airflow.utils import timezone
now = timezone . utcnow ()
a_date = timezone . datetime ( 2017 , 1 , 1 )
```
### 解释天真的日期时间对象
尽管Airflow完全可以识别时区,但它仍然可以在DAG定义中为&lt;cite&gt;start_dates&lt;/cite&gt;&lt;cite&gt;end_dates&lt;/cite&gt;接受天真的日期时间对象。 这主要是为了保持向后兼容性。 如果遇到天真的&lt;cite&gt;start_date&lt;/cite&gt;&lt;cite&gt;end_date,&lt;/cite&gt;则应用默认时区。 它以这样的方式应用,即假定天真日期时间已经在默认时区。 换句话说,如果您有&lt;cite&gt;欧洲/阿姆斯特丹&lt;/cite&gt;的默认时区设置并创建&lt;cite&gt;日期时间(2017,1,1)&lt;/cite&gt;的天真日期时间&lt;cite&gt;start_date&lt;/cite&gt; ,则假定它是2017年1月1日阿姆斯特丹时间的&lt;cite&gt;start_date&lt;/cite&gt;
```
default_args = dict (
start_date = datetime ( 2016 , 1 , 1 ),
owner = 'Airflow'
)
dag = DAG ( 'my_dag' , default_args = default_args )
op = DummyOperator ( task_id = 'dummy' , dag = dag )
print ( op . owner ) # Airflow
```
不幸的是,在DST转换期间,某些日期时间不存在或不明确。 在这种情况下,钟摆会引发异常。 这就是为什么在启用时区支持时应始终创建有意识的日期时间对象的原因。
实际上,这很少是一个问题。 Airflow可以让您了解模型和DAG中的日期时间对象,并且通常,新的日期时间对象是通过timedelta算法从现有对象创建的。 通常在应用程序代码中创建的唯一日期时间是当前时间,timezone.utcnow()自动执行正确的操作。
### 默认时区
默认时区是由&lt;cite&gt;[core]&lt;/cite&gt;下的&lt;cite&gt;default_timezone&lt;/cite&gt;设置定义的时区。 如果您刚刚安装了Airflow,它将被设置为&lt;cite&gt;utc&lt;/cite&gt; ,这是推荐的。 您还可以将其设置为&lt;cite&gt;系统&lt;/cite&gt;或IANA时区(例如“欧洲/阿姆斯特丹”)。 DAG也在Airflow工作人员上进行评估,因此确保所有Airflow节点上的此设置相同非常重要。
```
[ core ]
default_timezone = utc
```
## 时区感知DAG
创建时区感知DAG非常简单。 只需确保提供时区感知&lt;cite&gt;start_date&lt;/cite&gt; 。 建议使用&lt;cite&gt;摆锤&lt;/cite&gt; ,但也可以使用&lt;cite&gt;pytz&lt;/cite&gt; (手动安装)。
```
import pendulum
local_tz = pendulum . timezone ( "Europe/Amsterdam" )
default_args = dict (
start_date = datetime ( 2016 , 1 , 1 , tzinfo = local_tz ),
owner = 'Airflow'
)
dag = DAG ( 'my_tz_dag' , default_args = default_args )
op = DummyOperator ( task_id = 'dummy' , dag = dag )
print ( dag . timezone ) # <Timezone [Europe/Amsterdam]>
```
### 模板
Airflow在模板中返回时区感知日期时间,但不会将它们转换为本地时间,因此它们保持UTC。 由DAG来处理这个问题。
```
import pendulum
local_tz = pendulum . timezone ( "Europe/Amsterdam" )
local_tz . convert ( execution_date )
```
### Cron安排
如果您设置了cron计划,Airflow会假定您始终希望在同一时间运行。 然后它将忽略日光节省时间。 因此,如果您有一个时间表,表示每天在格林威治标准时间08:00 + 1的间隔结束时运行,它将始终在08:00格林尼治标准时间+ 1的间隔结束时运行,无论日间节电时间是否到位。
### 时间增量
对于具有时间增量的计划,Airflow假定您始终希望以指定的间隔运行。 因此,如果您指定timedelta(hours = 2),您将始终希望运行数小时。 在这种情况下,将考虑日光节省时间。
\ No newline at end of file
# 实验休息API
Airflow公开了一个实验性的Rest API。 它可以通过网络服务器获得。 端点可在/ api / experimental /获得。 请注意,我们希望端点定义发生变化。
## 端点
这是占位符,直到招摇定义处于活动状态
* / api / experimental / dags / &lt;DAG_ID&gt; / tasks / &lt;TASK_ID&gt;返回任务信息(GET)。
* / api / experimental / dags / &lt;DAG_ID&gt; / dag_runs为给定的dag id(POST)创建一个dag_run。
## CLI
对于某些功能,cli可以使用API​​。 要配置CLI以在可用时使用API​​,请按如下方式配置:
```
[ cli ]
api_client = airflow.api.client.json_client
endpoint_url = http://<WEBSERVER>:<PORT>
```
## 认证
API的身份验证与Web身份验证分开处理。 默认情况下,不需要对API进行任何身份验证 - 即默认情况下全开。 如果您的Airflow网络服务器可公开访问,则不建议这样做,您应该使用拒绝所有后端:
```
[api]
auth_backend = airflow.api.auth.backend.deny_all
```
API目前支持两种“真实”的身份验证方法。
要启用密码身份验证,请在配置中进行以下设置:
```
[ api ]
auth_backend = airflow.contrib.auth.backends.password_auth
```
它的用法类似于用于Web界面的密码验证。
要启用Kerberos身份验证,请在配置中设置以下内容:
```
[api]
auth_backend = airflow.api.auth.backend.kerberos_auth
[kerberos]
keytab = <KEYTAB>
```
Kerberos服务配置为`airflow/fully.qualified.domainname@REALM` 。 确保密钥表文件中存在此主体。
\ No newline at end of file
此差异已折叠。
# 血统
注意
血统支持是非常实验性的,可能会发生变化。
Airflow可以帮助跟踪数据的来源,发生的事情以及数据随时间的变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。
气流通过任务的入口和出口跟踪数据。 让我们从一个例子开始,看看它是如何工作的。
```
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta
FILE_CATEGORIES = [ "CAT1" , "CAT2" , "CAT3" ]
args = {
'owner' : 'airflow' ,
'start_date' : airflow . utils . dates . days_ago ( 2 )
}
dag = DAG (
dag_id = 'example_lineage' , default_args = args ,
schedule_interval = '0 0 * * *' ,
dagrun_timeout = timedelta ( minutes = 60 ))
f_final = File ( "/tmp/final" )
run_this_last = DummyOperator ( task_id = 'run_this_last' , dag = dag ,
inlets = { "auto" : True },
outlets = { "datasets" : [ f_final ,]})
f_in = File ( "/tmp/whole_directory/" )
outlets = []
for file in FILE_CATEGORIES :
f_out = File ( "/tmp/ {} /{{{{ execution_date }}}}" . format ( file ))
outlets . append ( f_out )
run_this = BashOperator (
task_id = 'run_me_first' , bash_command = 'echo 1' , dag = dag ,
inlets = { "datasets" : [ f_in ,]},
outlets = { "datasets" : outlets }
)
run_this . set_downstream ( run_this_last )
```
任务采用参数&lt;cite&gt;入口&lt;/cite&gt;&lt;cite&gt;出口&lt;/cite&gt; 。 入口可以由数据集列表&lt;cite&gt;{“数据集”:[dataset1,dataset2]}&lt;/cite&gt;手动定义,也可以配置为从上游任务中查找出口&lt;cite&gt;{“task_ids”:[“task_id1”,“task_id2”]}&lt;/cite&gt;或者可以配置为从直接上游任务&lt;cite&gt;{“auto”:True}&lt;/cite&gt;或它们的组合中获取出口。 出口被定义为数据集列表&lt;cite&gt;{“数据集”:[dataset1,dataset2]}&lt;/cite&gt; 。 在执行任务时,数据集的任何字段都使用上下文进行模板化。
注意
如果操作员支持,操作员可以自动添加入口和出口。
在示例DAG任务中, &lt;cite&gt;run_me_first&lt;/cite&gt;是一个BashOperator,它接收从列表生成的3个入口: &lt;cite&gt;CAT1&lt;/cite&gt;&lt;cite&gt;CAT2&lt;/cite&gt;&lt;cite&gt;CAT3&lt;/cite&gt; 。 请注意, &lt;cite&gt;execution_date&lt;/cite&gt;是一个模板化字段,将在任务运行时呈现。
注意
在幕后,Airflow将沿袭元数据作为任务的&lt;cite&gt;pre_execute&lt;/cite&gt;方法的一部分进行准备。 当任务完成执行&lt;cite&gt;时,&lt;/cite&gt;将调用&lt;cite&gt;post_execute&lt;/cite&gt;并将lineage元数据推送到XCOM中。 因此,如果您要创建自己的覆盖此方法的运算符,请确保分别使用&lt;cite&gt;prepare_lineage&lt;/cite&gt;&lt;cite&gt;apply_lineage&lt;/cite&gt;修饰您的方法。
## Apache Atlas
Airflow可以将其沿袭元数据发送到Apache Atlas。 您需要启用&lt;cite&gt;atlas&lt;/cite&gt;后端并正确配置它,例如在&lt;cite&gt;airflow.cfg中&lt;/cite&gt;
```
[ lineage ]
backend = airflow . lineage . backend . atlas
[ atlas ]
username = my_username
password = my_password
host = host
port = 21000
```
请确保安装了&lt;cite&gt;atlasclient&lt;/cite&gt;软件包。
\ No newline at end of file
# 快速开始
安装快速而直接。
```
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME = ~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
# start the scheduler
airflow scheduler
# visit localhost:8080 in the browser and enable the example dag in the home page
```
运行这些命令后,Airflow将创建`$AIRFLOW_HOME`文件夹,并打下一个“airflow.cfg”文件,其默认值可以让您快速上手。 您可以在`$AIRFLOW_HOME/airflow.cfg`检查文件,也可以通过`$AIRFLOW_HOME/airflow.cfg` `Admin-&gt;Configuration`菜单中的UI检查文件。 如果由systemd启动,则Web服务器的PID文件将存储在`$AIRFLOW_HOME/airflow-webserver.pid``/run/airflow/webserver.pid`
开箱即用,Airflow使用sqlite数据库,由于使用此数据库后端无法进行并行化,因此您应该很快就会长大。 它与`SequentialExecutor`一起使用,它只能按顺序运行任务实例。 虽然这是非常有限的,但它允许您快速启动和运行并浏览UI和命令行实用程序。
以下是一些将触发一些任务实例的命令。 在运行以下命令时,您应该能够在`example1` DAG中看到作业的状态发生变化。
```
# run your first task instance
airflow run example_bash_operator runme_0 2015 -01-01
# run a backfill over 2 days
airflow backfill example_bash_operator -s 2015 -01-01 -e 2015 -01-02
```
## 下一步是什么?
从这一点开始,您可以前往“ [教程”](tutorial.html)部分获取更多示例,或者如果您已准备好弄清楚,请参阅[“操作指南”](howto/index.html)部分。
\ No newline at end of file
# 常问问题
## 为什么我的任务没有安排好?
您的任务可能无法安排的原因有很多。 以下是一些常见原因:
* 您的脚本是否“编译”,Airflow引擎是否可以解析它并找到您的DAG对象。 要对此进行测试,您可以运行`airflow list_dags`并确认您的DAG显示在列表中。 您还可以运行`airflow list_tasks foo_dag_id --tree`并确认您的任务按预期显示在列表中。 如果您使用CeleryExecutor,您可能需要确认这既适用于调度程序运行的位置,也适用于工作程序运行的位置。
* 包含DAG的文件是否在内容的某处包含字符串“airflow”和“DAG”? 在搜索DAG目录时,Airflow忽略不包含“airflow”和“DAG”的文件,以防止DagBag解析导入与用户的DAG并置的所有python文件。
* 你的`start_date`设置正确吗? 在传递`start_date + scheduler_interval`之后,Airflow调度程序会立即触发任务。
* 您的`schedule_interval`设置正确吗? 默认`schedule_interval`是一天( `datetime.timedelta(1)` )。 您必须直接为实例化的DAG对象指定不同的`schedule_interval` ,而不是`default_param` ,因为任务实例不会覆盖其父DAG的`schedule_interval`
* 您的`start_date`超出了在UI中可以看到的位置吗? 如果将`start_date`设置为3个月之前的某个时间,您将无法在UI的主视图中看到它,但您应该能够在`Menu -&gt; Browse -&gt;Task Instances`看到它。
* 是否满足任务的依赖性。 直接位于任务上游的任务实例需要处于`success`状态。 此外,如果已设置`depends_on_past=True` ,则上一个任务实例需要成功(除非它是该任务的第一次运行)。 此外,如果`wait_for_downstream=True` ,请确保您了解其含义。 您可以从`Task Instance Details`页面查看如何设置这些属性。
* 您需要创建并激活DagRuns吗? DagRun表示整个DAG的特定执行,并具有状态(运行,成功,失败,......)。 调度程序在向前移动时创建新的DagRun,但永远不会及时创建新的DagRun。 调度程序仅评估`running` DagRuns以查看它可以触发的任务实例。 请注意,清除任务实例(从UI或CLI)确实将DagRun的状态设置为恢复运行。 您可以通过单击DAG的计划标记来批量查看DagRuns列表并更改状态。
* 是否达到了DAG的`concurrency`参数? `concurrency`定义了允许DAG `running`任务实例的数量,超过这一点,事物就会排队。
* 是否达到了DAG的`max_active_runs`参数? `max_active_runs`定义允许的DAG `running`并发实例的数量。
您可能还想阅读文档的“计划程序”部分,并确保完全了解其进度。
## 如何根据其他任务的失败触发任务?
查看文档“概念`Trigger Rule`部分中的“ `Trigger Rule`部分
## 安装airflow [crypto]后,为什么连接密码仍未在元数据db中加密?
查看文档“配置”部分中的“ `Connections`部分
## 与`start_date`什么关系?
`start_date`是前DagRun时代的部分遗产,但它在很多方面仍然具有相关性。 创建新DAG时,您可能希望使用`default_args`为任务设置全局`start_date` 。 要创建的第一个DagRun将基于所有任务的`min(start_date)` 。 从那时起,调度程序根据您的schedule_interval创建新的DagRuns,并在满足您的依赖项时运行相应的任务实例。 在向DAG引入新任务时,您需要特别注意`start_date` ,并且可能希望重新激活非活动DagRuns以正确启用新任务。
我们建议不要使用动态值作为`start_date` ,尤其是`datetime.now()`因为它可能非常混乱。 一旦周期结束,任务就会被触发,理论上, `@hourly` DAG永远不会达到一小时后,因为`now()`会移动。
以前我们还建议使用与`schedule_interval`相关的舍入`start_date` 。 这意味着`@hourly`将在`00:00`分钟:秒,午夜的`@monthly`工作,在这个月的第一个月的`@monthly`工作。 这不再是必需的。 现在,Airflow将自动对齐`start_date``schedule_interval` ,方法是使用`start_date`作为开始查看的时刻。
您可以使用任何传感器或`TimeDeltaSensor`来延迟计划间隔内的任务执行。 虽然`schedule_interval`允许指定`datetime.timedelta`对象,但我们建议使用宏或cron表达式,因为它强制执行舍入计划的这种想法。
使用`depends_on_past=True`时,必须特别注意`start_date`因为过去的依赖关系不会仅针对为任务指定的`start_date`的特定计划强制执行。 除非您计划为新任务运行回填,否则在引入新的`depends_on_past=True`时及时观察DagRun活动状态也很重要。
另外需要注意的是,在回填CLI命令的上下文中,任务`start_date`会被回填命令`start_date`覆盖。 这允许对具有`depends_on_past=True`任务实际启动的回填,如果不是这样,则回填就不会启动。
## 如何动态创建DAG?
Airflow在`DAGS_FOLDER`查找其全局命名空间中包含`DAG`对象的模块,并在`DagBag`添加它找到的对象。 知道这一切我们需要的是一种在全局命名空间中动态分配变量的方法,这可以在python中使用`globals()`函数轻松完成,标准库的行为就像一个简单的字典。
```
for i in range ( 10 ):
dag_id = 'foo_ {} ' . format ( i )
globals ()[ dag_id ] = DAG ( dag_id )
# or better, call a function that returns a DAG object!
```
## 我的进程列表中的所有`airflow run`命令是什么?
`airflow run`命令有很多层,这意味着它可以调用自身。
* 基本`airflow run` :启动执行程序,并告诉它运行`airflow run --local`命令。 如果使用Celery,这意味着它会在队列中放置一个命令,使其在worker上运行远程。 如果使用LocalExecutor,则转换为在子进程池中运行它。
* 本地`airflow run --local` :启动`airflow run --raw`命令(如下所述)作为子`airflow run --raw` ,负责发出心跳,监听外部`airflow run --raw`信号,并确保在子进程失败时进行一些清理
* 原始`airflow run --raw`运行实际操作员的执行方法并执行实际工作
## 我的气流dag如何运行得更快?
我们可以控制三个变量来改善气流dag性能:
* `parallelism` :此变量控制气流工作者可以同时运行的任务实例的数量。 用户可以增加`airflow.cfg`的并行度变量。
* `concurrency` :Airflow调度程序在任何给定时间都将为您的DAG运行不超过`$concurrency`任务实例。 并发性在Airflow DAG中定义。 如果未在DAG上设置并发性,则调度程序将使用`dag_concurrency`条目的缺省值。
* `max_active_runs` :在给定时间,Airflow调度程序将运行不超过DAG的`max_active_runs` DagRuns。 如果未在DAG中设置`max_active_runs` ,则调度程序将使用`airflow.cfg` `max_active_runs_per_dag`条目的缺省值。
## 我们如何减少气流UI页面加载时间?
如果你的dag需要很长时间才能加载,你可以将`airflow.cfg``default_dag_run_display_number`配置的值`airflow.cfg`到一个较小的值。 此可配置控制在UI中显示的dag run的数量,默认值为25。
## 如何修复异常:全局变量explicit_defaults_for_timestamp需要打开(1)?
这意味着在mysql服务器中禁用了`explicit_defaults_for_timestamp` ,您需要通过以下方式启用它:
1. 在my.cnf文件的mysqld部分下设置`explicit_defaults_for_timestamp = 1`
2. 重启Mysql服务器。
## 如何减少生产中的气流dag调度延迟?
* `max_threads` :Scheduler将并行生成多个线程来安排dags。 这由`max_threads`控制,默认值为2.用户应在生产中将此值增加到更大的值(例如,调度程序运行的cpus的数量 - 1)。
* `scheduler_heartbeat_sec` :用户应考虑将`scheduler_heartbeat_sec`配置增加到更高的值(例如60秒),该值控制气流调度程序获取心跳的频率并更新作业在数据库中的条目。
\ No newline at end of file
此差异已折叠。
# 安装
## 获得气流
安装最新稳定版Airflow的最简单方法是使用`pip`
```
pip install apache-airflow
```
您还可以安装Airflow,支持`s3``postgres`等额外功能:
```
pip install apache-airflow [ postgres,s3 ]
```
注意
GPL依赖
默认情况下,Apache Airflow的一个依赖项是拉入GPL库('unidecode')。 如果这是一个问题,您可以通过发出`export SLUGIFY_USES_TEXT_UNIDECODE=yes`强制非GPL库,然后继续正常安装。 请注意,每次升级都需要指定。 另请注意,如果&lt;cite&gt;系统&lt;/cite&gt;中已存在&lt;cite&gt;unidecode,&lt;/cite&gt;则仍将使用依赖关系。
## 额外套餐
`apache-airflow` airflow PyPI基本软件包只安装入门所需的内容。 可以安装子包,具体取决于您的环境中有用的内容。 例如,如果您不需要与Postgres连接,则不必经历安装`postgres-devel` yum软件包的麻烦,或者对您正在使用的分发应用的任何等效项。
在幕后,Airflow会对需要这些额外依赖关系的运营商进行条件导入。
这是子包的列表及其启用的内容:
<colgroup><col width="14%"><col width="42%"><col width="45%"></colgroup>
| 分装 | 安装命令 | 使 |
| --- | --- | --- |
| 所有 | `pip install apache-airflow[all]` | 所有Airflow功能都为人所知 |
| all_dbs | `pip install apache-airflow[all_dbs]` | 所有数据库集成 |
| 异步 | `pip install apache-airflow[async]` | Gunicorn的异步工作者课程 |
| 芹菜 | `pip install apache-airflow[celery]` | CeleryExecutor |
| cloudant | `pip install apache-airflow[cloudant]` | Cloudant钩子 |
| 加密 | `pip install apache-airflow[crypto]` | 加密元数据db中的连接密码 |
| devel的 | `pip install apache-airflow[devel]` | 最低开发工具要求 |
| devel_hadoop | `pip install apache-airflow[devel_hadoop]` | Airflow +对Hadoop堆栈的依赖 |
| 德鲁伊 | `pip install apache-airflow[druid]` | 德鲁伊相关的操作员和钩子 |
| gcp_api | `pip install apache-airflow[gcp_api]` | Google Cloud Platform挂钩和运营商(使用`google-api-python-client` ) |
| HDFS | `pip install apache-airflow[hdfs]` | HDFS挂钩和运营商 |
| 蜂巢 | `pip install apache-airflow[hive]` | 所有Hive相关的运营商 |
| JDBC | `pip install apache-airflow[jdbc]` | JDBC钩子和运算符 |
| 克柏伯里 | `pip install apache-airflow[kerberos]` | Kerberos集成Kerberized Hadoop |
| LDAP | `pip install apache-airflow[ldap]` | 用户的LDAP身份验证 |
| MSSQL | `pip install apache-airflow[mssql]` | Microsoft SQL Server操作员和钩子,支持作为Airflow后端 |
| MySQL的 | `pip install apache-airflow[mysql]` | MySQL运营商和hook,支持作为Airflow后端。 MySQL服务器的版本必须是5.6.4+。 确切的版本上限取决于`mysqlclient`包的版本。 例如, `mysqlclient` 1.3.12只能与MySQL服务器5.6.4到5.7一起使用。 |
| 密码 | `pip install apache-airflow[password]` | 用户密码验证 |
| Postgres的 | `pip install apache-airflow[postgres]` | PostgreSQL运算符和钩子,支持作为Airflow后端 |
| 量子点 | `pip install apache-airflow[qds]` | 启用QDS(Qubole数据服务)支持 |
| 的RabbitMQ | `pip install apache-airflow[rabbitmq]` | RabbitMQ支持作为Celery后端 |
| Redis的 | `pip install apache-airflow[redis]` | Redis挂钩和传感器 |
| S3 | `pip install apache-airflow[s3]` | `S3KeySensor``S3PrefixSensor` |
| 桑巴 | `pip install apache-airflow[samba]` | `Hive2SambaOperator` |
| 松弛 | `pip install apache-airflow[slack]` | `SlackAPIPostOperator` |
| SSH | `pip install apache-airflow[ssh]` | SSH钩子和操作员 |
| Vertica的 | `pip install apache-airflow[vertica]` | Vertica挂钩支持作为Airflow后端 |
## 启动Airflow数据库
在您运行任务之前,Airflow需要启动数据库。 如果您只是在试验和学习Airflow,您可以坚持使用默认的SQLite选项。 如果您不想使用SQLite,请查看[初始化数据库后端](howto/initialize-database.html)以设置其他数据库。
配置完成后,您需要先初始化数据库,然后才能运行任务:
```
airflow initdb
```
\ No newline at end of file
# 教程
本教程将向您介绍一些基本的Airflow概念,对象及其在编写第一个管道时的用法。
## 示例管道定义
以下是基本管道定义的示例。 如果这看起来很复杂,请不要担心,下面将逐行说明。
```
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG ( 'tutorial' , default_args = default_args )
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator (
task_id = 'templated' ,
bash_command = templated_command ,
params = { 'my_param' : 'Parameter I passed in' },
dag = dag )
t2 . set_upstream ( t1 )
t3 . set_upstream ( t1 )
```
## 这是一个DAG定义文件
包围你的一件事(对于每个人来说可能不是很直观)是这个Airflow Python脚本实际上只是一个配置文件,将DAG的结构指定为代码。 此处定义的实际任务将在与此脚本的上下文不同的上下文中运行。 不同的任务在不同的时间点运行在不同的工作者上,这意味着该脚本不能用于在任务之间交叉通信。 请注意,为此,我们有一个名为`XCom`的更高级功能。
人们有时会将DAG定义文件视为可以进行实际数据处理的地方 - 事实并非如此! 该脚本的目的是定义DAG对象。 它需要快速评估(秒,而不是几分钟),因为调度程序将定期执行它以反映更改(如果有的话)。
## 导入模块
Airflow管道只是一个Python脚本,恰好定义了Airflow DAG对象。 让我们首先导入我们需要的库。
```
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
```
## 默认参数
我们即将创建一个DAG和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数(这将变得多余),或者(更好!)我们可以定义一个默认参数的字典,我们可以可以在创建任务时使用。
```
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
```
有关BaseOperator参数及其功能的更多信息,请参阅[`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")文档。
另外,请注意,您可以轻松定义可用于不同目的的不同参数集。 一个例子是在生产和开发环境之间进行不同的设置。
## 实例化DAG
我们需要一个DAG对象来嵌入我们的任务。 这里我们传递一个定义`dag_id`的字符串,它用作DAG的唯一标识符。 我们还传递我们刚刚定义的默认参数字典,并为DAG定义1天的`schedule_interval`
```
dag = DAG (
'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 ))
```
## 任务
在实例化操作员对象时生成任务。 从运算符实例化的对象称为构造函数。 第一个参数`task_id`充当任务的唯一标识符。
```
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
```
请注意我们如何将从BaseOperator继承的所有运算符( `retries` )共同的运算符特定参数( `bash_command` )和通用参数传递给运算符的构造函数。 这比为每个构造函数调用传递每个参数更简单。 另请注意,在第二个任务中,我们使用`3`覆盖`retries`参数。
任务的优先规则如下:
1. 明确传递参数
2. `default_args`字典中存在的值
3. 运算符的默认值(如果存在)
任务必须包含或继承参数`task_id``owner` ,否则Airflow将引发异常。
## 与金贾一起模仿
Airflow充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的强大功能,并为管道作者提供了一组内置参数和宏。 Airflow还为管道作者提供了定义自己的参数,宏和模板的钩子。
本教程几乎没有涉及在Airflow中使用模板进行操作的表面,但本节的目的是让您知道此功能的存在,让您熟悉双花括号,并指向最常见的模板变量: `{{ ds }}` (今天的“日期戳”)。
```
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator (
task_id = 'templated' ,
bash_command = templated_command ,
params = { 'my_param' : 'Parameter I passed in' },
dag = dag )
```
请注意, `templated_command`包含`{% %}`块中的代码逻辑,引用参数如`{{ ds }}` ,调用`{{ macros.ds_add(ds, 7)}}`的函数,并在`{{ macros.ds_add(ds, 7)}}`引用用户定义的参数`{{ params.my_param }}`
`BaseOperator``params`钩子允许您将参数和/或对象的字典传递给模板。 请花点时间了解参数`my_param`如何通过模板。
文件也可以传递给`bash_command`参数,例如`bash_command='templated_command.sh'` ,其中文件位置相对于包含管道文件的目录(在本例中为`tutorial.py` )。 这可能是出于许多原因,例如分离脚本的逻辑和管道代码,允许在使用不同语言编写的文件中进行正确的代码突出显示,以及构造管道的一般灵活性。 也可以将`template_searchpath`定义为指向DAG构造函数调用中的任何文件夹位置。
使用相同的DAG构造函数调用,可以定义`user_defined_macros` ,它允许您指定自己的变量。 例如,将`dict(foo='bar')`传递给此参数允许您在模板中使用`{{ foo }}` 。 此外,指定`user_defined_filters`允许您注册自己的过滤器。 例如,将`dict(hello=lambda name: 'Hello %s' % name)`传递给此参数允许您使用`{{ 'world' | hello }}` 你的模板中的`{{ 'world' | hello }}` 有关自定义过滤器的更多信息,请查看[Jinja文档](http://jinja.pocoo.org/docs/dev/api/)
有关可以在模板中引用的变量和宏的更多信息,请务必阅读[](code.html)部分
## 设置依赖关系
我们有两个不相互依赖的简单任务。 以下是一些可以定义它们之间依赖关系的方法:
```
t2 . set_upstream ( t1 )
# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)
t3 . set_upstream ( t1 )
# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')
```
请注意,在执行脚本时,Airflow会在DAG中找到循环或多次引用依赖项时引发异常。
## 概括
好吧,所以我们有一个非常基本的DAG。 此时,您的代码应如下所示:
```
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG (
'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 ))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator (
task_id = 'templated' ,
bash_command = templated_command ,
params = { 'my_param' : 'Parameter I passed in' },
dag = dag )
t2 . set_upstream ( t1 )
t3 . set_upstream ( t1 )
```
## 测试
### 运行脚本
是时候进行一些测试了。 首先让我们确保管道解析。 假设我们正在保存`airflow.cfg`引用的`airflow.cfg`文件夹中`tutorial.py`中上一步的代码。 DAG的默认位置是`~/airflow/dags`
```
python ~/airflow/dags/tutorial.py
```
如果脚本没有引发异常,则意味着您没有做任何可怕的错误,并且您的Airflow环境有点健全。
### 命令行元数据验证
让我们运行一些命令来进一步验证这个脚本。
```
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
```
### 测试
让我们通过在特定日期运行实际任务实例来进行测试。 在此上下文中指定的日期是`execution_date` ,它模拟在特定日期+时间运行任务或dag的调度程序:
```
# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015 -06-01
# testing sleep
airflow test tutorial sleep 2015 -06-01
```
现在还记得我们之前用模板做过的事吗? 通过运行此命令,了解如何呈现和执行此模板:
```
# testing templated
airflow test tutorial templated 2015 -06-01
```
这应该导致显示详细的事件日志并最终运行bash命令并打印结果。
请注意, `airflow test`命令在本地运行任务实例,将其日志输出到stdout(在屏幕上),不依赖于依赖项,并且不向数据库传达状态(运行,成功,失败,...)。 它只允许测试单个任务实例。
### 回填
一切看起来都运行良好所以让我们运行回填。 `backfill`将尊重您的依赖关系,将日志发送到文件并与数据库通信以记录状态。 如果您有网络服务器,您将能够跟踪进度。 如果您有兴趣在回填过程中直观地跟踪进度, `airflow webserver`将启动Web服务器。
请注意,如果使用`depends_on_past=True` ,则单个任务实例将取决于前面任务实例的成功,除了指定了自身的start_date,此依赖关系将被忽略。
此上下文中的日期范围是`start_date`和可选的`end_date` ,它们用于使用此dag中的任务实例填充运行计划。
```
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2015 -06-01 -e 2015 -06-07
```
## 下一步是什么?
就是这样,你已经编写,测试并回填了你的第一个Airflow管道。 将代码合并到具有针对它运行的主调度程序的代码存储库中应该让它每天都被触发并运行。
以下是您可能想要做的一些事情:
* 深入了解用户界面 - 点击所有内容!
* 继续阅读文档! 特别是以下部分:
&gt; * 命令行界面
&gt; * 运营商
&gt; * 宏
* 写下你的第一个管道!
\ No newline at end of file
# 操作指南
[快速入门](https://apachecn.github.io/airflow-doc-zh/start.html)部分设置沙箱很容易; 建设一个生产级环境需要更多的工作!
这些操作指南将指导您完成使用和配置Airflow环境的常见任务。
* [设置配置选项](set-config.html)
* [初始化数据库后端](initialize-database.html)
* [使用运算符](operator.html)
* [BashOperator](operator.html)
* [PythonOperator](operator.html)
* [Google云端平台运营商](operator.html)
* [管理连接](manage-connections.html)
* [使用UI创建连接](manage-connections.html)
* [使用UI编辑连接](manage-connections.html)
* [使用环境变量创建连接](manage-connections.html)
* [连接类型](manage-connections.html)
* [保护连接](secure-connections.html)
* [写日志](write-logs.html)
* [在本地编写日志](write-logs.html)
* [将日志写入Amazon S3](write-logs.html)
* [将日志写入Azure Blob存储](write-logs.html)
* [将日志写入Google云端存储](write-logs.html)
* [用Celery扩大规模](executor/use-celery.html)
* [用Dask扩展](executor/use-dask.html)
* [使用Mesos扩展(社区贡献)](executor/use-mesos.html)
* [任务直接在mesos从站上执行](executor/use-mesos.html)
* [在mesos从站上的容器中执行的任务](executor/use-mesos.html)
* [使用systemd运行Airflow](run-with-systemd.html)
* [用upstart运行Airflow](run-with-upstart.html)
* [使用测试模式配置](use-test-config.html)
\ No newline at end of file
# 设置配置选项
第一次运行Airflow时,它会在`$AIRFLOW_HOME`目录中创建一个名为`airflow.cfg`的文件(默认情况下为`~/airflow` `$AIRFLOW_HOME` )。 此文件包含Airflow的配置,您可以对其进行编辑以更改任何设置。 您还可以使用以下格式设置带有环境变量的选项: `$AIRFLOW__{SECTION}__{KEY}` (注意双下划线)。
例如,元数据库连接字符串可以在`airflow.cfg`设置,如下所示:
```
[ core ]
sql_alchemy_conn = my_conn_string
```
或者通过创建相应的环境变量:
```
AIRFLOW__CORE__SQL_ALCHEMY_CONN = my_conn_string
```
您还可以通过将`_cmd`附加到键来在运行时派生连接字符串,如下所示:
```
[ core ]
sql_alchemy_conn_cmd = bash_command_to_run
```
- 但只有三个这样的配置元素,即sql_alchemy_conn,broker_url和result_backend可以作为命令获取。 这背后的想法是不将密码存储在纯文本文件的框中。 优先顺序如下 -
1. 环境变量
2. airflow.cfg中的配置
3. airflow.cfg中的命令
4. 默认
\ No newline at end of file
# 初始化数据库后端
如果您想对Airflow进行真正的试驾,您应该考虑设置一个真正的数据库后端并切换到LocalExecutor。
由于Airflow是使用优秀的SqlAlchemy库与其元数据进行交互而构建的,因此您应该能够使用任何支持作为SqlAlchemy后端的数据库后端。 我们建议使用**MySQL****Postgres**
注意
我们依赖更严格的MySQL SQL设置来获得合理的默认值。 确保在&lt;cite&gt;[mysqld]&lt;/cite&gt;下的my.cnf中指定了&lt;cite&gt;explicit_defaults_for_timestamp = 1&lt;/cite&gt;
注意
如果您决定使用**Postgres** ,我们建议您使用`psycopg2`驱动程序并在SqlAlchemy连接字符串中指定它。 另请注意,由于SqlAlchemy没有公开在Postgres连接URI中定位特定模式的方法,因此您可能希望使用类似于`ALTER ROLE username SET search_path = airflow, foobar;`的命令为您的角色设置默认模式`ALTER ROLE username SET search_path = airflow, foobar;`
将数据库设置为托管Airflow后,您需要更改配置文件`$AIRFLOW_HOME/airflow.cfg`的SqlAlchemy连接字符串。 然后,您还应该将“executor”设置更改为使用“LocalExecutor”,这是一个可以在本地并行化任务实例的执行程序。
```
# initialize the database
airflow initdb
```
\ No newline at end of file
# 使用运算符
运算符代表一个理想情况下是幂等的任务。 操作员确定DAG运行时实际执行的内容。
有关更多信息,请参阅[Operators Concepts](https://apachecn.github.io/airflow-doc-zh/concepts.html)文档和[Operators API Reference](https://apachecn.github.io/airflow-doc-zh/code.html)
* [BashOperator](9)
* [模板](9)
* [故障排除](9)
* [找不到Jinja模板](9)
* [PythonOperator](9)
* [传递参数](9)
* [模板](9)
* [Google云端平台运营商](9)
* [GoogleCloudStorageToBigQueryOperator](9)
## [BashOperator](9)
使用[`BashOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.bash_operator.BashOperator")[Bash](https://www.gnu.org/software/bash/) shell中执行命令。
```
run_this = BashOperator (
task_id = 'run_after_loop' , bash_command = 'echo 1' , dag = dag )
```
### [模板](9)
您可以使用[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html)来参数化`bash_command`参数。
```
task = BashOperator (
task_id = 'also_run_this' ,
bash_command = 'echo "run_id={{ run_id }} | dag_run={{ dag_run }}"' ,
dag = dag )
```
### [故障排除](9)
#### [找不到Jinja模板](9)
在使用`bash_command`参数直接调用Bash脚本时,在脚本名称后添加空格。 这是因为Airflow尝试将Jinja模板应用于它,这将失败。
```
t2 = BashOperator (
task_id = 'bash_example' ,
# This fails with `Jinja template not found` error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command = "/home/batcher/test.sh " ,
dag = dag )
```
## [PythonOperator](9)
使用[`PythonOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.python_operator.PythonOperator")执行Python callables。
```
def print_context ( ds , ** kwargs ):
pprint ( kwargs )
print ( ds )
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator (
task_id = 'print_the_context' ,
provide_context = True ,
python_callable = print_context ,
dag = dag )
```
### [传递参数](9)
使用`op_args``op_kwargs`参数将其他参数传递给Python可调用对象。
```
def my_sleeping_function ( random_base ):
"""This is a function that will run within the DAG execution"""
time . sleep ( random_base )
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range ( 5 ):
task = PythonOperator (
task_id = 'sleep_for_' + str ( i ),
python_callable = my_sleeping_function ,
op_kwargs = { 'random_base' : float ( i ) / 10 },
dag = dag )
task . set_upstream ( run_this )
```
### [模板](9)
当您将`provide_context`参数设置为`True` ,Airflow会传入一组额外的关键字参数:一个用于每个[Jinja模板变量](https://apachecn.github.io/airflow-doc-zh/code.html)和一个`templates_dict`参数。
`templates_dict`参数是模板化的,因此字典中的每个值都被评估为[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html)
## [Google云端平台运营商](9)
### [GoogleCloudStorageToBigQueryOperator](9)
使用[`GoogleCloudStorageToBigQueryOperator`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator")执行BigQuery加载作业。
```
load_csv = gcs_to_bq . GoogleCloudStorageToBigQueryOperator (
task_id = 'gcs_to_bq_example' ,
bucket = 'cloud-samples-data' ,
source_objects = [ 'bigquery/us-states/us-states.csv' ],
destination_project_dataset_table = 'airflow_test.gcs_to_bq_table' ,
schema_fields = [
{ 'name' : 'name' , 'type' : 'STRING' , 'mode' : 'NULLABLE' },
{ 'name' : 'post_abbr' , 'type' : 'STRING' , 'mode' : 'NULLABLE' },
],
write_disposition = 'WRITE_TRUNCATE' ,
dag = dag )
```
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册