28.md 129.2 KB
Newer Older
W
README  
wizardforcel 已提交
1
# 集成
W
zh raw  
wizardforcel 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

*   [反向代理](28)
*   [Azure:Microsoft Azure](28)
*   [AWS:亚马逊网络服务](28)
*   [Databricks](28)
*   [GCP:Google云端平台](28)

## 反向代理

可以在反向代理后面设置气流,并能够灵活地设置其端点。

例如,您可以配置反向代理以获取:

W
wizardforcel 已提交
15
```py
W
zh raw  
wizardforcel 已提交
16 17 18 19
 https : // lab . mycompany . com / myorg / airflow /

```

W
wizardforcel 已提交
20
为此,您需要在`airflow.cfg中`设置以下设置:
W
zh raw  
wizardforcel 已提交
21

W
wizardforcel 已提交
22
```py
W
zh raw  
wizardforcel 已提交
23 24 25 26
 base_url = http : // my_host / myorg / airflow

```

W
wizardforcel 已提交
27
此外,如果您使用Celery Executor,您可以使用以下命令获取`/` in `myorg / flower中的` Flower:
W
zh raw  
wizardforcel 已提交
28

W
wizardforcel 已提交
29
```py
W
zh raw  
wizardforcel 已提交
30 31 32 33 34 35 36 37
 flower_url_prefix = / myorg / flower

```

您的反向代理(例如:nginx)应配置如下:

*   传递url和http标头作为Airflow网络服务器,没有任何重写,例如:

W
wizardforcel 已提交
38
    ```py
W
zh raw  
wizardforcel 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    server {
      listen 80;
      server_name lab.mycompany.com;

      location /myorg/airflow/ {
          proxy_pass http://localhost:8080;
          proxy_set_header Host $host;
          proxy_redirect off;
          proxy_http_version 1.1;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection "upgrade";
      }
    }

    ```

*   重写花端点的url:

W
wizardforcel 已提交
57
    ```py
W
zh raw  
wizardforcel 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    server {
        listen 80;
        server_name lab.mycompany.com;

        location /myorg/flower/ {
            rewrite ^/myorg/flower/(.*)$ /$1 break;  # remove prefix from http header
            proxy_pass http://localhost:5555;
            proxy_set_header Host $host;
            proxy_redirect off;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
    }

    ```

## Azure:Microsoft Azure

Airflow对Microsoft Azure的支持有限:仅存在Azure Blob存储和Azure Data Lake的接口。 Blob存储的钩子,传感器和操作员以及Azure Data Lake Hook都在contrib部分。

### Azure Blob存储

W
wizardforcel 已提交
81
所有类都通过Window Azure Storage Blob协议进行通信。 确保存在类型为`wasb`的Airflow连接。 可以通过在额外字段中提供登录(=存储帐户名称)和密码(= KEY),或登录和SAS令牌来完成授权(有关`示例` ,请参阅连接`wasb_default` )。
W
zh raw  
wizardforcel 已提交
82 83 84 85 86 87 88 89

*   [WasbBlobSensor](28) :检查Azure Blob存储上是否存在blob。
*   [WasbPrefixSensor](28) :检查Azure Blob存储上是否存在与前缀匹配的blob。
*   [FileToWasbOperator](28) :将本地文件作为blob [上载到](28)容器。
*   [WasbHook](28) :与Azure Blob存储的接口。

#### WasbBlobSensor

W
wizardforcel 已提交
90
```py
W
zh raw  
wizardforcel 已提交
91 92 93
class airflow.contrib.sensors.wasb_sensor.WasbBlobSensor(container_name, blob_name, wasb_conn_id='wasb_default', check_options=None, *args, **kwargs) 
```

W
wizardforcel 已提交
94
基类: [`airflow.sensors.base_sensor_operator.BaseSensorOperator`](code.html "airflow.sensors.base_sensor_operator.BaseSensorOperator")
W
zh raw  
wizardforcel 已提交
95 96 97

等待blob到达Azure Blob存储。

W
wizardforcel 已提交
98
参数:
W
zh raw  
wizardforcel 已提交
99

W
wizardforcel 已提交
100 101 102 103
*   `container_name( str )` - 容器的名称。
*   `blob_name( str )` - blob的名称。
*   `wasb_conn_id( str )` - 对wasb连接的引用。
*   `check_options( dict )` - `WasbHook.check_for_blob()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
104 105


W
wizardforcel 已提交
106
```py
W
zh raw  
wizardforcel 已提交
107 108 109 110 111 112 113
poke(context) 
```

传感器在派生此类时定义的功能应该覆盖。

#### WasbPrefixSensor

W
wizardforcel 已提交
114
```py
W
zh raw  
wizardforcel 已提交
115 116 117
class airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor(container_name, prefix, wasb_conn_id='wasb_default', check_options=None, *args, **kwargs) 
```

W
wizardforcel 已提交
118
基类: [`airflow.sensors.base_sensor_operator.BaseSensorOperator`](code.html "airflow.sensors.base_sensor_operator.BaseSensorOperator")
W
zh raw  
wizardforcel 已提交
119 120 121

等待与前缀匹配的blob到达Azure Blob存储。

W
wizardforcel 已提交
122
参数:
W
zh raw  
wizardforcel 已提交
123

W
wizardforcel 已提交
124 125 126 127
*   `container_name( str )` - 容器的名称。
*   `prefix( str )` - blob的前缀。
*   `wasb_conn_id( str )` - 对wasb连接的引用。
*   `check_options( dict )` - `WasbHook.check_for_prefix()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
128 129


W
wizardforcel 已提交
130
```py
W
zh raw  
wizardforcel 已提交
131 132 133 134 135 136 137
poke(context) 
```

传感器在派生此类时定义的功能应该覆盖。

#### FileToWasbOperator

W
wizardforcel 已提交
138
```py
W
zh raw  
wizardforcel 已提交
139 140 141
class airflow.contrib.operators.file_to_wasb.FileToWasbOperator(file_path, container_name, blob_name, wasb_conn_id='wasb_default', load_options=None, *args, **kwargs) 
```

W
wizardforcel 已提交
142
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
143 144 145

将文件上载到Azure Blob存储。

W
wizardforcel 已提交
146
参数:
W
zh raw  
wizardforcel 已提交
147

W
wizardforcel 已提交
148 149 150 151 152
*   `file_path( str )` - 要加载的文件的路径。 (模板)
*   `container_name( str )` - 容器的名称。 (模板)
*   `blob_name( str )` - blob的名称。 (模板)
*   `wasb_conn_id( str )` - 对wasb连接的引用。
*   `load_options( dict )` - `WasbHook.load_file()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
153 154


W
wizardforcel 已提交
155
```py
W
zh raw  
wizardforcel 已提交
156 157 158 159 160 161 162
execute(context) 
```

将文件上载到Azure Blob存储。

#### WasbHook

W
wizardforcel 已提交
163
```py
W
zh raw  
wizardforcel 已提交
164 165 166
class airflow.contrib.hooks.wasb_hook.WasbHook(wasb_conn_id='wasb_default') 
```

W
wizardforcel 已提交
167
基类: `airflow.hooks.base_hook.BaseHook`
W
zh raw  
wizardforcel 已提交
168 169 170

通过wasb://协议与Azure Blob存储进行交互。

W
wizardforcel 已提交
171 172
在连接的“额外”字段中传递的其他选项将传递给`BlockBlockService()`构造函数。 例如,通过添加{“sas_token”:“YOUR_TOKEN”}使用SAS令牌进行身份验证。

W
wizardforcel 已提交
173
参数:`wasb_conn_id( str )` - 对wasb连接的引用。 
W
zh raw  
wizardforcel 已提交
174 175


W
wizardforcel 已提交
176
```py
W
zh raw  
wizardforcel 已提交
177 178 179 180 181
check_for_blob(container_name, blob_name, **kwargs) 
```

检查Azure Blob存储上是否存在Blob。

W
wizardforcel 已提交
182
参数:
W
zh raw  
wizardforcel 已提交
183

W
wizardforcel 已提交
184 185 186
*   `container_name( str )` - 容器的名称。
*   `blob_name( str )` - blob的名称。
*   `kwargs( object )` - `BlockBlobService.exists()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
187

W
wizardforcel 已提交
188
返回:如果blob存在则为True,否则为False。
W
zh raw  
wizardforcel 已提交
189 190 191

:rtype布尔

W
wizardforcel 已提交
192
```py
W
zh raw  
wizardforcel 已提交
193 194 195 196 197
check_for_prefix(container_name, prefix, **kwargs) 
```

检查Azure Blob存储上是否存在前缀。

W
wizardforcel 已提交
198
参数:
W
zh raw  
wizardforcel 已提交
199

W
wizardforcel 已提交
200 201 202
*   `container_name( str )` - 容器的名称。
*   `prefix( str )` - blob的前缀。
*   `kwargs( object )` - `BlockBlobService.list_blobs()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
203

W
wizardforcel 已提交
204
返回:如果存在与前缀匹配的blob,则为True,否则为False。
W
zh raw  
wizardforcel 已提交
205 206 207

:rtype布尔

W
wizardforcel 已提交
208
```py
W
zh raw  
wizardforcel 已提交
209 210 211 212 213
get_conn() 
```

返回BlockBlobService对象。

W
wizardforcel 已提交
214
```py
W
zh raw  
wizardforcel 已提交
215 216 217 218 219
get_file(file_path, container_name, blob_name, **kwargs) 
```

从Azure Blob存储下载文件。

W
wizardforcel 已提交
220
参数:
W
zh raw  
wizardforcel 已提交
221

W
wizardforcel 已提交
222 223 224 225
*   `file_path( str )` - 要下载的文件的路径。
*   `container_name( str )` - 容器的名称。
*   `blob_name( str )` - blob的名称。
*   `kwargs( object )` - `BlockBlobService.create_blob_from_path()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
226 227


W
wizardforcel 已提交
228
```py
W
zh raw  
wizardforcel 已提交
229 230 231 232 233
load_file(file_path, container_name, blob_name, **kwargs) 
```

将文件上载到Azure Blob存储。

W
wizardforcel 已提交
234
参数:
W
zh raw  
wizardforcel 已提交
235

W
wizardforcel 已提交
236 237 238 239
*   `file_path( str )` - 要加载的文件的路径。
*   `container_name( str )` - 容器的名称。
*   `blob_name( str )` - blob的名称。
*   `kwargs( object )` - `BlockBlobService.create_blob_from_path()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
240 241


W
wizardforcel 已提交
242
```py
W
zh raw  
wizardforcel 已提交
243 244 245 246 247
load_string(string_data, container_name, blob_name, **kwargs) 
```

将字符串上载到Azure Blob存储。

W
wizardforcel 已提交
248
参数:
W
zh raw  
wizardforcel 已提交
249

W
wizardforcel 已提交
250 251 252 253
*   `string_data( str )` - 要加载的字符串。
*   `container_name( str )` - 容器的名称。
*   `blob_name( str )` - blob的名称。
*   `kwargs( object )` - `BlockBlobService.create_blob_from_text()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
254 255


W
wizardforcel 已提交
256
```py
W
zh raw  
wizardforcel 已提交
257 258 259 260 261
read_file(container_name, blob_name, **kwargs) 
```

从Azure Blob Storage读取文件并以字符串形式返回。

W
wizardforcel 已提交
262
参数:
W
zh raw  
wizardforcel 已提交
263

W
wizardforcel 已提交
264 265 266
*   `container_name( str )` - 容器的名称。
*   `blob_name( str )` - blob的名称。
*   `kwargs( object )` - `BlockBlobService.create_blob_from_path()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
267 268 269 270


### Azure文件共享

W
wizardforcel 已提交
271
SMB文件共享的云变体。 确保存在类型为`wasb`的Airflow连接。 可以通过在额外字段中提供登录(=存储帐户名称)和密码(=存储帐户密钥)或登录和SAS令牌来完成授权(有关`示例` ,请参阅连接`wasb_default` )。
W
zh raw  
wizardforcel 已提交
272 273 274

#### AzureFileShareHook

W
wizardforcel 已提交
275
```py
W
zh raw  
wizardforcel 已提交
276 277 278
class airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook(wasb_conn_id='wasb_default') 
```

W
wizardforcel 已提交
279
基类: `airflow.hooks.base_hook.BaseHook`
W
zh raw  
wizardforcel 已提交
280 281 282

与Azure FileShare存储交互。

W
wizardforcel 已提交
283 284
在连接的“额外”字段中传递的其他选项将传递给`FileService()`构造函数。

W
wizardforcel 已提交
285
参数:`wasb_conn_id( str )` - 对wasb连接的引用。 
W
zh raw  
wizardforcel 已提交
286 287


W
wizardforcel 已提交
288
```py
W
zh raw  
wizardforcel 已提交
289 290 291 292 293
check_for_directory(share_name, directory_name, **kwargs) 
```

检查Azure文件共享上是否存在目录。

W
wizardforcel 已提交
294
参数:
W
zh raw  
wizardforcel 已提交
295

W
wizardforcel 已提交
296 297 298
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `kwargs( object )` - `FileService.exists()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
299

W
wizardforcel 已提交
300
返回:如果文件存在则为True,否则为False。
W
zh raw  
wizardforcel 已提交
301 302 303

:rtype布尔

W
wizardforcel 已提交
304
```py
W
zh raw  
wizardforcel 已提交
305 306 307 308 309
check_for_file(share_name, directory_name, file_name, **kwargs) 
```

检查Azure文件共享上是否存在文件。

W
wizardforcel 已提交
310
参数:
W
zh raw  
wizardforcel 已提交
311

W
wizardforcel 已提交
312 313 314 315
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `file_name( str )` - 文件名。
*   `kwargs( object )` - `FileService.exists()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
316

W
wizardforcel 已提交
317
返回:如果文件存在则为True,否则为False。
W
zh raw  
wizardforcel 已提交
318 319 320

:rtype布尔

W
wizardforcel 已提交
321
```py
W
zh raw  
wizardforcel 已提交
322 323 324 325 326
create_directory(share_name, directory_name, **kwargs) 
```

在Azure文件共享上创建新的目标。

W
wizardforcel 已提交
327
参数:
W
zh raw  
wizardforcel 已提交
328

W
wizardforcel 已提交
329 330 331
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `kwargs( object )` - `FileService.create_directory()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
332

W
wizardforcel 已提交
333
返回:文件和目录列表
W
zh raw  
wizardforcel 已提交
334 335 336

:rtype列表

W
wizardforcel 已提交
337
```py
W
zh raw  
wizardforcel 已提交
338 339 340 341 342
get_conn() 
```

返回FileService对象。

W
wizardforcel 已提交
343
```py
W
zh raw  
wizardforcel 已提交
344 345 346 347 348
get_file(file_path, share_name, directory_name, file_name, **kwargs) 
```

从Azure文件共享下载文件。

W
wizardforcel 已提交
349
参数:
W
zh raw  
wizardforcel 已提交
350

W
wizardforcel 已提交
351 352 353 354 355
*   `file_path( str )` - 存储文件的位置。
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `file_name( str )` - 文件名。
*   `kwargs( object )` - `FileService.get_file_to_path()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
356 357


W
wizardforcel 已提交
358
```py
W
zh raw  
wizardforcel 已提交
359 360 361 362 363
get_file_to_stream(stream, share_name, directory_name, file_name, **kwargs) 
```

从Azure文件共享下载文件。

W
wizardforcel 已提交
364
参数:
W
zh raw  
wizardforcel 已提交
365

W
wizardforcel 已提交
366 367 368 369 370
*   `stream(类文件对象 )` - 用于存储文件的文件句柄。
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `file_name( str )` - 文件名。
*   `kwargs( object )` - `FileService.get_file_to_stream()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
371 372


W
wizardforcel 已提交
373
```py
W
zh raw  
wizardforcel 已提交
374 375 376 377 378
list_directories_and_files(share_name, directory_name=None, **kwargs) 
```

返回存储在Azure文件共享中的目录和文件列表。

W
wizardforcel 已提交
379
参数:
W
zh raw  
wizardforcel 已提交
380

W
wizardforcel 已提交
381 382 383
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `kwargs( object )` - `FileService.list_directories_and_files()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
384

W
wizardforcel 已提交
385
返回:文件和目录列表
W
zh raw  
wizardforcel 已提交
386 387 388

:rtype列表

W
wizardforcel 已提交
389
```py
W
zh raw  
wizardforcel 已提交
390 391 392 393 394
load_file(file_path, share_name, directory_name, file_name, **kwargs) 
```

将文件上载到Azure文件共享。

W
wizardforcel 已提交
395
参数:
W
zh raw  
wizardforcel 已提交
396

W
wizardforcel 已提交
397 398 399 400 401
*   `file_path( str )` - 要加载的文件的路径。
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `file_name( str )` - 文件名。
*   `kwargs( object )` - `FileService.create_file_from_path()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
402 403


W
wizardforcel 已提交
404
```py
W
zh raw  
wizardforcel 已提交
405 406 407 408 409
load_stream(stream, share_name, directory_name, file_name, count, **kwargs) 
```

将流上载到Azure文件共享。

W
wizardforcel 已提交
410
参数:
W
zh raw  
wizardforcel 已提交
411

W
wizardforcel 已提交
412 413 414 415 416 417
*   `stream(类文件 )` - 打开的文件/流作为文件内容上传。
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `file_name( str )` - 文件名。
*   `count( int )` - 流的大小(以字节为单位)
*   `kwargs( object )` - `FileService.create_file_from_stream()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
418 419


W
wizardforcel 已提交
420
```py
W
zh raw  
wizardforcel 已提交
421 422 423 424 425
load_string(string_data, share_name, directory_name, file_name, **kwargs) 
```

将字符串上载到Azure文件共享。

W
wizardforcel 已提交
426
参数:
W
zh raw  
wizardforcel 已提交
427

W
wizardforcel 已提交
428 429 430 431 432
*   `string_data( str )` - 要加载的字符串。
*   `share_name( str )` - 共享的名称。
*   `directory_name( str )` - 目录的名称。
*   `file_name( str )` - 文件名。
*   `kwargs( object )` - `FileService.create_file_from_text()`采用的可选关键字参数。
W
zh raw  
wizardforcel 已提交
433 434 435 436 437 438 439 440


### 记录

可以将Airflow配置为在Azure Blob存储中读取和写入任务日志。 请参阅[将日志写入Azure Blob存储](howto/write-logs.html)

### Azure Data Lake

W
wizardforcel 已提交
441
AzureDataLakeHook通过与WebHDFS兼容的REST API进行通信。 确保存在`azure_data_lake`类型的气流连接。 可以通过提供登录(=客户端ID),密码(=客户端密钥)和额外字段租户(租户)和account_name(帐户名称)来完成授权
W
zh raw  
wizardforcel 已提交
442

W
wizardforcel 已提交
443
> (有关`示例` ,请参阅`azure_data_lake_default`连接)。
W
zh raw  
wizardforcel 已提交
444 445 446 447 448

*   [AzureDataLakeHook](28) :与Azure Data Lake的接口。

#### AzureDataLakeHook

W
wizardforcel 已提交
449
```py
W
zh raw  
wizardforcel 已提交
450 451 452
class airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook(azure_data_lake_conn_id='azure_data_lake_default') 
```

W
wizardforcel 已提交
453
基类: `airflow.hooks.base_hook.BaseHook`
W
zh raw  
wizardforcel 已提交
454 455 456 457 458

与Azure Data Lake进行交互。

客户端ID和客户端密钥应该在用户和密码参数中。 租户和帐户名称应为{“租户”:“<TENANT>”,“account_name”:“ACCOUNT_NAME”}的额外字段。

W
wizardforcel 已提交
459
参数:`azure_data_lake_conn_id( str )` - 对Azure Data Lake连接的引用。 
W
wizardforcel 已提交
460

W
zh raw  
wizardforcel 已提交
461

W
wizardforcel 已提交
462
```py
W
zh raw  
wizardforcel 已提交
463 464 465 466 467
check_for_file(file_path) 
```

检查Azure Data Lake上是否存在文件。

W
wizardforcel 已提交
468
参数:`file_path( str )` - 文件的路径和名称。 
W
wizardforcel 已提交
469 470

返回:如果文件存在则为True,否则为False。
W
zh raw  
wizardforcel 已提交
471 472 473

:rtype布尔

W
wizardforcel 已提交
474
```py
W
zh raw  
wizardforcel 已提交
475 476 477 478 479
download_file(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304) 
```

从Azure Blob存储下载文件。

W
wizardforcel 已提交
480
参数:
W
zh raw  
wizardforcel 已提交
481

W
wizardforcel 已提交
482 483 484 485 486 487
*   `local_path( str )` - 本地路径。 如果下载单个文件,将写入此特定文件,除非它是现有目录,在这种情况下,将在其中创建文件。 如果下载多个文件,这是要写入的根目录。 将根据需要创建目录。
*   `remote_path( str )` - 用于查找远程文件的远程路径/ globstring。 不支持使用`**的`递归glob模式。
*   `nthreads( int )` - 要使用的线程数。 如果为None,则使用核心数。
*   `overwrite( bool )` - 是否强制覆盖现有文件/目录。 如果False和远程路径是目录,则无论是否覆盖任何文件都将退出。 如果为True,则实际仅覆盖匹配的文件名。
*   `buffersize( int )` - int [2 ** 22]内部缓冲区的字节数。 此块不能大于块,并且不能小于块。
*   `blocksize( int )` - int [2 ** 22]块的字节数。 在每个块中,我们为每个API调用编写一个较小的块。 这个块不能大于块。
W
zh raw  
wizardforcel 已提交
488 489


W
wizardforcel 已提交
490
```py
W
zh raw  
wizardforcel 已提交
491 492 493 494 495
get_conn() 
```

返回AzureDLFileSystem对象。

W
wizardforcel 已提交
496
```py
W
zh raw  
wizardforcel 已提交
497 498 499 500 501
upload_file(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304) 
```

将文件上载到Azure Data Lake。

W
wizardforcel 已提交
502
参数:
W
zh raw  
wizardforcel 已提交
503

W
wizardforcel 已提交
504 505 506 507 508 509
*   `local_path( str )` - 本地路径。 可以是单个文件,目录(在这种情况下,递归上传)或glob模式。 不支持使用`**的`递归glob模式。
*   `remote_path( str )` - 要上传的远程路径; 如果有多个文件,这就是要写入的dircetory根目录。
*   `nthreads( int )` - 要使用的线程数。 如果为None,则使用核心数。
*   `overwrite( bool )` - 是否强制覆盖现有文件/目录。 如果False和远程路径是目录,则无论是否覆盖任何文件都将退出。 如果为True,则实际仅覆盖匹配的文件名。
*   `buffersize( int )` - int [2 ** 22]内部缓冲区的字节数。 此块不能大于块,并且不能小于块。
*   `blocksize( int )` - int [2 ** 22]块的字节数。 在每个块中,我们为每个API调用编写一个较小的块。 这个块不能大于块。
W
zh raw  
wizardforcel 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524


## AWS:亚马逊网络服务

Airflow广泛支持Amazon Web Services。 但请注意,Hook,Sensors和Operators都在contrib部分。

### AWS EMR

*   [EmrAddStepsOperator](28) :向现有EMR JobFlow添加步骤。
*   [EmrCreateJobFlowOperator](28) :创建EMR JobFlow,从EMR连接读取配置。
*   [EmrTerminateJobFlowOperator](28) :终止EMR JobFlow。
*   [EmrHook](28) :与AWS EMR互动。

#### EmrAddStepsOperator

W
wizardforcel 已提交
525
```py
W
zh raw  
wizardforcel 已提交
526 527 528
class airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator(job_flow_id, aws_conn_id='s3_default', steps=None, *args, **kwargs) 
```

W
wizardforcel 已提交
529
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
530 531 532

向现有EMR job_flow添加步骤的运算符。

W
wizardforcel 已提交
533
参数:
W
zh raw  
wizardforcel 已提交
534 535

*   **job_flow_id** - 要添加步骤的JobFlow的ID。 (模板)
W
wizardforcel 已提交
536 537
*   `aws_conn_id( str )` - 与使用的aws连接
*   `步骤( list )` - 要添加到作业流的boto3样式步骤。 (模板)
W
zh raw  
wizardforcel 已提交
538 539 540 541


#### EmrCreateJobFlowOperator

W
wizardforcel 已提交
542
```py
W
zh raw  
wizardforcel 已提交
543 544 545
class airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator(aws_conn_id='s3_default', emr_conn_id='emr_default', job_flow_overrides=None, *args, **kwargs) 
```

W
wizardforcel 已提交
546
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
547 548 549

创建EMR JobFlow,从EMR连接读取配置。 可以传递JobFlow覆盖的字典,覆盖连接中的配置。

W
wizardforcel 已提交
550
参数:
W
zh raw  
wizardforcel 已提交
551

W
wizardforcel 已提交
552 553
*   `aws_conn_id( str )` - 与使用的aws连接
*   `emr_conn_id( str )` - 要使用的emr连接
W
zh raw  
wizardforcel 已提交
554 555 556 557 558
*   **job_flow_overrides** - 用于覆盖emr_connection extra的boto3样式参数。 (模板)


#### EmrTerminateJobFlowOperator

W
wizardforcel 已提交
559
```py
W
zh raw  
wizardforcel 已提交
560 561 562
class airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator(job_flow_id, aws_conn_id='s3_default', *args, **kwargs) 
```

W
wizardforcel 已提交
563
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
564 565 566

运营商终止EMR JobFlows。

W
wizardforcel 已提交
567
参数:
W
zh raw  
wizardforcel 已提交
568 569

*   **job_flow_id** - 要终止的JobFlow的id。 (模板)
W
wizardforcel 已提交
570
*   `aws_conn_id( str )` - 与使用的aws连接
W
zh raw  
wizardforcel 已提交
571 572 573 574


#### EmrHook

W
wizardforcel 已提交
575
```py
W
zh raw  
wizardforcel 已提交
576 577 578
class airflow.contrib.hooks.emr_hook.EmrHook(emr_conn_id=None, *args, **kwargs) 
```

W
wizardforcel 已提交
579
基类: [`airflow.contrib.hooks.aws_hook.AwsHook`](code.html "airflow.contrib.hooks.aws_hook.AwsHook")
W
zh raw  
wizardforcel 已提交
580 581 582

与AWS EMR交互。 emr_conn_id只是使用create_job_flow方法所必需的。

W
wizardforcel 已提交
583
```py
W
zh raw  
wizardforcel 已提交
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598
create_job_flow(job_flow_overrides) 
```

使用EMR连接中的配置创建作业流。 json额外哈希的键可以具有boto3 run_job_flow方法的参数。 此配置的覆盖可以作为job_flow_overrides传递。

### AWS S3

*   [S3Hook](28) :与AWS S3交互。
*   [S3FileTransformOperator](28) :将数据从源S3位置复制到本地文件系统上的临时位置。
*   [S3ListOperator](28) :列出与S3位置的键前缀匹配的文件。
*   [S3ToGoogleCloudStorageOperator](28) :将S3位置与Google云端存储[分区](28)同步。
*   [S3ToHiveTransfer](28) :将数据从S3移动到Hive。 操作员从S3下载文件,在将文件加载到Hive表之前将其存储在本地。

#### S3Hook

W
wizardforcel 已提交
599
```py
W
zh raw  
wizardforcel 已提交
600 601 602
class airflow.hooks.S3_hook.S3Hook(aws_conn_id='aws_default') 
```

W
wizardforcel 已提交
603
基类: [`airflow.contrib.hooks.aws_hook.AwsHook`](code.html "airflow.contrib.hooks.aws_hook.AwsHook")
W
zh raw  
wizardforcel 已提交
604 605 606

使用boto3库与AWS S3交互。

W
wizardforcel 已提交
607
```py
W
zh raw  
wizardforcel 已提交
608 609 610 611 612
check_for_bucket(bucket_name) 
```

检查bucket_name是否存在。

W
wizardforcel 已提交
613
参数:`bucket_name( str )` - 存储桶的名称 
W
wizardforcel 已提交
614

W
zh raw  
wizardforcel 已提交
615

W
wizardforcel 已提交
616
```py
W
zh raw  
wizardforcel 已提交
617 618 619 620 621
check_for_key(key, bucket_name=None) 
```

检查存储桶中是否存在密钥

W
wizardforcel 已提交
622
参数:
W
zh raw  
wizardforcel 已提交
623

W
wizardforcel 已提交
624 625
*   `key( str )` - 指向文件的S3键
*   `bucket_name( str )` - 存储文件的存储桶的名称
W
zh raw  
wizardforcel 已提交
626 627


W
wizardforcel 已提交
628
```py
W
zh raw  
wizardforcel 已提交
629 630 631 632 633
check_for_prefix(bucket_name, prefix, delimiter) 
```

检查存储桶中是否存在前缀

W
wizardforcel 已提交
634
```py
W
zh raw  
wizardforcel 已提交
635 636 637 638 639
check_for_wildcard_key(wildcard_key, bucket_name=None, delimiter='') 
```

检查桶中是否存在与通配符表达式匹配的密钥

W
wizardforcel 已提交
640
```py
W
zh raw  
wizardforcel 已提交
641 642 643 644 645
get_bucket(bucket_name) 
```

返回boto3.S3.Bucket对象

W
wizardforcel 已提交
646
参数:`bucket_name( str )` - 存储桶的名称 
W
wizardforcel 已提交
647

W
zh raw  
wizardforcel 已提交
648

W
wizardforcel 已提交
649
```py
W
zh raw  
wizardforcel 已提交
650 651 652 653 654
get_key(key, bucket_name=None) 
```

返回boto3.s3.Object

W
wizardforcel 已提交
655
参数:
W
zh raw  
wizardforcel 已提交
656

W
wizardforcel 已提交
657 658
*   `key( str )` - 密钥的路径
*   `bucket_name( str )` - 存储桶的名称
W
zh raw  
wizardforcel 已提交
659 660


W
wizardforcel 已提交
661
```py
W
zh raw  
wizardforcel 已提交
662 663 664 665 666
get_wildcard_key(wildcard_key, bucket_name=None, delimiter='') 
```

返回与通配符表达式匹配的boto3.s3.Object对象

W
wizardforcel 已提交
667
参数:
W
zh raw  
wizardforcel 已提交
668

W
wizardforcel 已提交
669 670
*   `wildcard_key( str )` - 密钥的路径
*   `bucket_name( str )` - 存储桶的名称
W
zh raw  
wizardforcel 已提交
671 672


W
wizardforcel 已提交
673
```py
W
zh raw  
wizardforcel 已提交
674 675 676 677 678
list_keys(bucket_name, prefix='', delimiter='', page_size=None, max_items=None) 
```

列出前缀下的存储桶中的密钥,但不包含分隔符

W
wizardforcel 已提交
679
参数:
W
zh raw  
wizardforcel 已提交
680

W
wizardforcel 已提交
681 682 683 684 685
*   `bucket_name( str )` - 存储桶的名称
*   `prefix( str )` - 一个密钥前缀
*   `delimiter( str )` - 分隔符标记键层次结构。
*   `page_size( int )` - 分页大小
*   `max_items( int )` - 要返回的最大项目数
W
zh raw  
wizardforcel 已提交
686 687


W
wizardforcel 已提交
688
```py
W
zh raw  
wizardforcel 已提交
689 690 691 692 693
list_prefixes(bucket_name, prefix='', delimiter='', page_size=None, max_items=None) 
```

列出前缀下的存储桶中的前缀

W
wizardforcel 已提交
694
参数:
W
zh raw  
wizardforcel 已提交
695

W
wizardforcel 已提交
696 697 698 699 700
*   `bucket_name( str )` - 存储桶的名称
*   `prefix( str )` - 一个密钥前缀
*   `delimiter( str )` - 分隔符标记键层次结构。
*   `page_size( int )` - 分页大小
*   `max_items( int )` - 要返回的最大项目数
W
zh raw  
wizardforcel 已提交
701 702


W
wizardforcel 已提交
703
```py
W
zh raw  
wizardforcel 已提交
704 705 706 707 708 709 710
load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False) 
```

将字节加载到S3

这是为了方便在S3中删除字符串。 它使用boto基础结构将文件发送到s3。

W
wizardforcel 已提交
711
参数:
W
zh raw  
wizardforcel 已提交
712

W
wizardforcel 已提交
713 714 715 716 717
*   `bytes_data( bytes )` - 设置为密钥内容的字节。
*   `key( str )` - 指向文件的S3键
*   `bucket_name( str )` - 存储文件的存储桶的名称
*   `replace( bool )` - 一个标志,用于决定是否覆盖密钥(如果已存在)
*   `encrypt( bool )` - 如果为True,则文件将在服务器端由S3加密,并在S3中静止时以加密形式存储。
W
zh raw  
wizardforcel 已提交
718 719


W
wizardforcel 已提交
720
```py
W
zh raw  
wizardforcel 已提交
721 722 723 724 725
load_file(filename, key, bucket_name=None, replace=False, encrypt=False) 
```

将本地文件加载到S3

W
wizardforcel 已提交
726
参数:
W
zh raw  
wizardforcel 已提交
727

W
wizardforcel 已提交
728 729 730 731 732
*   `filename( str )` - 要加载的文件的名称。
*   `key( str )` - 指向文件的S3键
*   `bucket_name( str )` - 存储文件的存储桶的名称
*   `replace( bool )` - 一个标志,用于决定是否覆盖密钥(如果已存在)。 如果replace为False且密钥存在,则会引发错误。
*   `encrypt( bool )` - 如果为True,则文件将在服务器端由S3加密,并在S3中静止时以加密形式存储。
W
zh raw  
wizardforcel 已提交
733 734


W
wizardforcel 已提交
735
```py
W
zh raw  
wizardforcel 已提交
736 737 738 739 740 741 742
load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding='utf-8') 
```

将字符串加载到S3

这是为了方便在S3中删除字符串。 它使用boto基础结构将文件发送到s3。

W
wizardforcel 已提交
743
参数:
W
zh raw  
wizardforcel 已提交
744

W
wizardforcel 已提交
745 746 747 748 749
*   `string_data( str )` - 要设置为键的内容的字符串。
*   `key( str )` - 指向文件的S3键
*   `bucket_name( str )` - 存储文件的存储桶的名称
*   `replace( bool )` - 一个标志,用于决定是否覆盖密钥(如果已存在)
*   `encrypt( bool )` - 如果为True,则文件将在服务器端由S3加密,并在S3中静止时以加密形式存储。
W
zh raw  
wizardforcel 已提交
750 751


W
wizardforcel 已提交
752
```py
W
zh raw  
wizardforcel 已提交
753 754 755 756 757
read_key(key, bucket_name=None) 
```

从S3读取密钥

W
wizardforcel 已提交
758
参数:
W
zh raw  
wizardforcel 已提交
759

W
wizardforcel 已提交
760 761
*   `key( str )` - 指向文件的S3键
*   `bucket_name( str )` - 存储文件的存储桶的名称
W
zh raw  
wizardforcel 已提交
762 763


W
wizardforcel 已提交
764
```py
W
zh raw  
wizardforcel 已提交
765 766 767 768 769
select_key(key, bucket_name=None, expression='SELECT * FROM S3Object', expression_type='SQL', input_serialization={'CSV': {}}, output_serialization={'CSV': {}}) 
```

使用S3 Select读取密钥。

W
wizardforcel 已提交
770 771
参数:

W
wizardforcel 已提交
772 773 774 775 776 777
*   `key( str )` - 指向文件的S3键
*   `bucket_name( str )` - 存储文件的存储桶的名称
*   `expression( str )` - S3选择表达式
*   `expression_type( str )` - S3选择表达式类型
*   `input_serialization( dict )` - S3选择输入数据序列化格式
*   `output_serialization( dict )` - S3选择输出数据序列化格式
W
zh raw  
wizardforcel 已提交
778

W
wizardforcel 已提交
779
返回:通过S3 Select检索原始数据的子集
W
zh raw  
wizardforcel 已提交
780

W
wizardforcel 已提交
781
返回类型:海峡
W
zh raw  
wizardforcel 已提交
782 783 784

也可以看看

W
wizardforcel 已提交
785
有关S3 Select参数的更多详细信息: [http](http://boto3.readthedocs.io/en/latest/reference/services/s3.html)[//boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.selectobjectcontent](http://boto3.readthedocs.io/en/latest/reference/services/s3.html)
W
zh raw  
wizardforcel 已提交
786 787 788

#### S3FileTransformOperator

W
wizardforcel 已提交
789
```py
W
zh raw  
wizardforcel 已提交
790 791 792
class airflow.operators.s3_file_transform_operator.S3FileTransformOperator(source_s3_key, dest_s3_key, transform_script=None, select_expression=None, source_aws_conn_id='aws_default', dest_aws_conn_id='aws_default', replace=False, *args, **kwargs) 
```

W
wizardforcel 已提交
793
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
794 795 796 797 798 799 800

将数据从源S3位置复制到本地文件系统上的临时位置。 根据转换脚本的指定对此文件运行转换,并将输出上载到目标S3位置。

本地文件系统中的源文件和目标文件的位置作为转换脚本的第一个和第二个参数提供。 转换脚本应该从源读取数据,转换它并将输出写入本地目标文件。 然后,操作员接管控制并将本地目标文件上载到S3。

S3 Select也可用于过滤源内容。 如果指定了S3 Select表达式,则用户可以省略转换脚本。

W
wizardforcel 已提交
801
参数:
W
zh raw  
wizardforcel 已提交
802

W
wizardforcel 已提交
803 804 805 806 807 808 809
*   `source_s3_key( str )` - 从S3检索的密钥。 (模板)
*   `source_aws_conn_id( str )` - 源s3连接
*   `dest_s3_key( str )` - 从S3写入的密钥。 (模板)
*   `dest_aws_conn_id( str )` - 目标s3连接
*   `replace( bool )` - 替换dest S3密钥(如果已存在)
*   `transform_script( str )` - 可执行转换脚本的位置
*   `select_expression( str )` - S3选择表达式
W
zh raw  
wizardforcel 已提交
810 811 812 813


#### S3ListOperator

W
wizardforcel 已提交
814
```py
W
wizardforcel 已提交
815
class airflow.contrib.operators.s3listoperator.S3ListOperator(bucket, prefix='', delimiter='', aws_conn_id='aws_default', *args, **kwargs) 
W
zh raw  
wizardforcel 已提交
816 817
```

W
wizardforcel 已提交
818
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
819 820 821

列出桶中具有名称中给定字符串前缀的所有对象。

W
wizardforcel 已提交
822
此运算符返回一个python列表,其中包含可由`xcom`在下游任务中使用的对象名称。
W
zh raw  
wizardforcel 已提交
823

W
wizardforcel 已提交
824
参数:
W
zh raw  
wizardforcel 已提交
825

W
wizardforcel 已提交
826 827 828 829
*   `bucket( str )` - S3存储桶在哪里找到对象。 (模板)
*   `prefix( str )` - 用于过滤名称以此前缀开头的对象的前缀字符串。 (模板)
*   `delimiter( str )` - 分隔符标记键层次结构。 (模板)
*   `aws_conn_id( str )` - 连接到S3存储时使用的连接ID。
W
zh raw  
wizardforcel 已提交
830 831


W
wizardforcel 已提交
832
```py
W
zh raw  
wizardforcel 已提交
833 834 835 836 837
 Example: 
```

以下运算符将列出`data`存储区中S3 `customers/2018/04/` key的所有文件(不包括子文件夹)。

W
wizardforcel 已提交
838
```py
W
zh raw  
wizardforcel 已提交
839 840 841 842 843 844 845 846 847 848 849 850
 s3_file = S3ListOperator (
    task_id = 'list_3s_files' ,
    bucket = 'data' ,
    prefix = 'customers/2018/04/' ,
    delimiter = '/' ,
    aws_conn_id = 'aws_customers_conn'
)

```

#### S3ToGoogleCloudStorageOperator

W
wizardforcel 已提交
851
```py
W
zh raw  
wizardforcel 已提交
852 853 854
class airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator(bucket, prefix='', delimiter='', aws_conn_id='aws_default', dest_gcs_conn_id=None, dest_gcs=None, delegate_to=None, replace=False, *args, **kwargs) 
```

W
wizardforcel 已提交
855
基类: [`airflow.contrib.operators.s3listoperator.S3ListOperator`](28 "airflow.contrib.operators.s3listoperator.S3ListOperator")
W
zh raw  
wizardforcel 已提交
856 857 858

将S3密钥(可能是前缀)与Google云端存储目标路径同步。

W
wizardforcel 已提交
859
参数:
W
zh raw  
wizardforcel 已提交
860

W
wizardforcel 已提交
861 862 863 864 865 866 867 868
*   `bucket( str )` - S3存储桶在哪里找到对象。 (模板)
*   `prefix( str )` - 前缀字符串,用于过滤名称以此前缀开头的对象。 (模板)
*   `delimiter( str )` - 分隔符标记键层次结构。 (模板)
*   `aws_conn_id( str )` - 源S3连接
*   `dest_gcs_conn_id( str )` - 连接到Google云端存储时要使用的目标连接ID。
*   `dest_gcs( str )` - 要存储文件的目标Google云端存储**分区**和前缀。 (模板)
*   `delegate_to( str )` - 模拟的帐户(如果有)。 为此,发出请求的服务帐户必须启用域范围委派。
*   `replace( bool )` - 是否要替换现有目标文件。
W
zh raw  
wizardforcel 已提交
869 870 871 872


**示例** :.. code-block :: python

W
wizardforcel 已提交
873
> ```py
W
zh raw  
wizardforcel 已提交
874 875 876 877 878 879 880 881 882
>  s3_to_gcs_op = S3ToGoogleCloudStorageOperator( 
> ```
> 
> task_id ='s3_to_gcs_example',bucket ='my-s3-bucket',prefix ='data / customers-201804',dest_gcs_conn_id ='google_cloud_default',dest_gcs ='gs://my.gcs.bucket/some/customers/' ,replace = False,dag = my-dag)

请注意, `bucket``prefix``delimiter``dest_gcs`是模板化的,因此如果您愿意,可以在其中使用变量。

#### S3ToHiveTransfer

W
wizardforcel 已提交
883
```py
W
zh raw  
wizardforcel 已提交
884 885 886
class airflow.operators.s3_to_hive_operator.S3ToHiveTransfer(s3_key, field_dict, hive_table, delimiter=', ', create=True, recreate=False, partition=None, headers=False, check_headers=False, wildcard_match=False, aws_conn_id='aws_default', hive_cli_conn_id='hive_cli_default', input_compressed=False, tblproperties=None, select_expression=None, *args, **kwargs) 
```

W
wizardforcel 已提交
887
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
888 889 890 891 892

将数据从S3移动到Hive。 操作员从S3下载文件,在将文件加载到Hive表之前将其存储在本地。 如果`create``recreate`参数设置为`True` ,则会生成`CREATE TABLE``DROP TABLE`语句。 Hive数据类型是从游标的元数据中推断出来的。

请注意,Hive中生成的表使用`STORED AS textfile` ,这不是最有效的序列化格式。 如果加载了大量数据和/或表格被大量查询,您可能只想使用此运算符将数据暂存到临时表中,然后使用`HiveOperator`将其加载到最终目标中。

W
wizardforcel 已提交
893 894
参数:

W
wizardforcel 已提交
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909
*   `s3_key( str )` - 从S3检索的密钥。 (模板)
*   `field_dict( dict )` - 字段的字典在文件中命名为键,其Hive类型为值
*   `hive_table( str )` - 目标Hive表,使用点表示法来定位特定数据库。 (模板)
*   `create( bool )` - 是否创建表,如果它不存在
*   `recreate( bool )` - 是否在每次执行时删除并重新创建表
*   `partition( dict )` - 将目标分区作为分区列和值的字典。 (模板)
*   `headers( bool )` - 文件是否包含第一行的列名
*   `check_headers( bool )` - 是否应该根据field_dict的键检查第一行的列名
*   `wildcard_match( bool )` - 是否应将s3_key解释为Unix通配符模式
*   `delimiter( str )` - 文件中的字段分隔符
*   `aws_conn_id( str )` - 源s3连接
*   `hive_cli_conn_id( str )` - 目标配置单元连接
*   `input_compressed( bool )` - 布尔值,用于确定是否需要文件解压缩来处理标头
*   `tblproperties( dict )` - 正在创建的hive表的TBLPROPERTIES
*   `select_expression( str )` - S3选择表达式
W
wizardforcel 已提交
910

W
zh raw  
wizardforcel 已提交
911 912 913 914 915 916 917

### AWS EC2容器服务

*   [ECSOperator](28) :在AWS EC2容器服务上执行任务。

#### ECSOperator

W
wizardforcel 已提交
918
```py
W
zh raw  
wizardforcel 已提交
919 920 921
class airflow.contrib.operators.ecs_operator.ECSOperator(task_definition, cluster, overrides, aws_conn_id=None, region_name=None, launch_type='EC2', **kwargs) 
```

W
wizardforcel 已提交
922
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
923 924 925

在AWS EC2 Container Service上执行任务

W
wizardforcel 已提交
926
参数:
W
zh raw  
wizardforcel 已提交
927

W
wizardforcel 已提交
928 929 930
*   `task_definition( str )` - EC2容器服务上的任务定义名称
*   `cluster( str )` - EC2 Container Service上的群集名称
*   `aws_conn_id( str )` - AWS凭证/区域名称的连接ID。 如果为None,将使用凭证boto3策略( [http://boto3.readthedocs.io/en/latest/guide/configuration.html](http://boto3.readthedocs.io/en/latest/guide/configuration.html) )。
W
zh raw  
wizardforcel 已提交
931 932 933
*   **region_name** - 要在AWS Hook中使用的区域名称。 覆盖连接中的region_name(如果提供)
*   **launch_type** - 运行任务的启动类型('EC2'或'FARGATE')

W
wizardforcel 已提交
934 935 936 937 938
参数:覆盖:boto3将接收的相同参数(模板化): [http](http://boto3.readthedocs.org/en/latest/reference/services/ecs.html) ://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task

类型:覆盖:dict

类型:launch_type:str
W
zh raw  
wizardforcel 已提交
939 940 941 942 943 944 945

### AWS Batch Service

*   [AWSBatchOperator](28) :在AWS Batch Service上执行任务。

#### AWSBatchOperator

W
wizardforcel 已提交
946
```py
W
zh raw  
wizardforcel 已提交
947 948 949
class airflow.contrib.operators.awsbatch_operator.AWSBatchOperator(job_name, job_definition, job_queue, overrides, max_retries=4200, aws_conn_id=None, region_name=None, **kwargs) 
```

W
wizardforcel 已提交
950
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
951 952 953

在AWS Batch Service上执行作业

W
wizardforcel 已提交
954
参数:
W
zh raw  
wizardforcel 已提交
955

W
wizardforcel 已提交
956 957 958 959 960
*   `job_name( str )` - 将在AWS Batch上运行的作业的名称
*   `job_definition( str )` - AWS Batch上的作业定义名称
*   `job_queue( str )` - AWS Batch上的队列名称
*   `max_retries( int )` - 服务器未合并时的指数退避重试,4200 = 48小时
*   `aws_conn_id( str )` - AWS凭证/区域名称的连接ID。 如果为None,将使用凭证boto3策略( [http://boto3.readthedocs.io/en/latest/guide/configuration.html](http://boto3.readthedocs.io/en/latest/guide/configuration.html) )。
W
zh raw  
wizardforcel 已提交
961 962
*   **region_name** - 要在AWS Hook中使用的区域名称。 覆盖连接中的region_name(如果提供)

W
wizardforcel 已提交
963 964 965
参数:覆盖:boto3将在containerOverrides上接收的相同参数(模板化): [http](http://boto3.readthedocs.io/en/latest/reference/services/batch.html) ://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job

类型:覆盖:dict
W
zh raw  
wizardforcel 已提交
966 967 968 969 970 971 972 973 974 975

### AWS RedShift

*   [AwsRedshiftClusterSensor](28) :等待Redshift群集达到特定状态。
*   [RedshiftHook](28) :使用boto3库与AWS Redshift交互。
*   [RedshiftToS3Transfer](28) :对带有或不带标头的CSV执行卸载命令。
*   [S3ToRedshiftTransfer](28) :从S3执行复制命令为CSV,带或不带标题。

#### AwsRedshiftClusterSensor

W
wizardforcel 已提交
976
```py
W
zh raw  
wizardforcel 已提交
977 978 979
class airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor(cluster_identifier, target_status='available', aws_conn_id='aws_default', *args, **kwargs) 
```

W
wizardforcel 已提交
980
基类: [`airflow.sensors.base_sensor_operator.BaseSensorOperator`](code.html "airflow.sensors.base_sensor_operator.BaseSensorOperator")
W
zh raw  
wizardforcel 已提交
981 982 983

等待Redshift群集达到特定状态。

W
wizardforcel 已提交
984
参数:
W
zh raw  
wizardforcel 已提交
985

W
wizardforcel 已提交
986 987
*   `cluster_identifier( str )` - 要ping的集群的标识符。
*   `target_status( str )` - 所需的集群状态。
W
zh raw  
wizardforcel 已提交
988 989


W
wizardforcel 已提交
990
```py
W
zh raw  
wizardforcel 已提交
991 992 993 994 995 996 997
poke(context) 
```

传感器在派生此类时定义的功能应该覆盖。

#### RedshiftHook

W
wizardforcel 已提交
998
```py
W
zh raw  
wizardforcel 已提交
999 1000 1001
class airflow.contrib.hooks.redshift_hook.RedshiftHook(aws_conn_id='aws_default') 
```

W
wizardforcel 已提交
1002
基类: [`airflow.contrib.hooks.aws_hook.AwsHook`](code.html "airflow.contrib.hooks.aws_hook.AwsHook")
W
zh raw  
wizardforcel 已提交
1003 1004 1005

使用boto3库与AWS Redshift交互

W
wizardforcel 已提交
1006
```py
W
zh raw  
wizardforcel 已提交
1007 1008 1009 1010 1011
cluster_status(cluster_identifier) 
```

返回群集的状态

W
wizardforcel 已提交
1012
参数:`cluster_identifier( str )` - 集群的唯一标识符 
W
wizardforcel 已提交
1013

W
zh raw  
wizardforcel 已提交
1014

W
wizardforcel 已提交
1015
```py
W
zh raw  
wizardforcel 已提交
1016 1017 1018 1019 1020
create_cluster_snapshot(snapshot_identifier, cluster_identifier) 
```

创建群集的快照

W
wizardforcel 已提交
1021
参数:
W
zh raw  
wizardforcel 已提交
1022

W
wizardforcel 已提交
1023 1024
*   `snapshot_identifier( str )` - 群集快照的唯一标识符
*   `cluster_identifier( str )` - 集群的唯一标识符
W
zh raw  
wizardforcel 已提交
1025 1026


W
wizardforcel 已提交
1027
```py
W
zh raw  
wizardforcel 已提交
1028 1029 1030 1031 1032
delete_cluster(cluster_identifier, skip_final_cluster_snapshot=True, final_cluster_snapshot_identifier='') 
```

删除群集并可选择创建快照

W
wizardforcel 已提交
1033
参数:
W
zh raw  
wizardforcel 已提交
1034

W
wizardforcel 已提交
1035 1036 1037
*   `cluster_identifier( str )` - 集群的唯一标识符
*   `skip_final_cluster_snapshot( bool )` - 确定群集快照创建
*   `final_cluster_snapshot_identifier( str )` - 最终集群快照的名称
W
zh raw  
wizardforcel 已提交
1038 1039


W
wizardforcel 已提交
1040
```py
W
zh raw  
wizardforcel 已提交
1041 1042 1043 1044 1045
describe_cluster_snapshots(cluster_identifier) 
```

获取群集的快照列表

W
wizardforcel 已提交
1046
参数:`cluster_identifier( str )` - 集群的唯一标识符 
W
wizardforcel 已提交
1047

W
zh raw  
wizardforcel 已提交
1048

W
wizardforcel 已提交
1049
```py
W
zh raw  
wizardforcel 已提交
1050 1051 1052 1053 1054
restore_from_cluster_snapshot(cluster_identifier, snapshot_identifier) 
```

从其快照还原群集

W
wizardforcel 已提交
1055
参数:
W
zh raw  
wizardforcel 已提交
1056

W
wizardforcel 已提交
1057 1058
*   `cluster_identifier( str )` - 集群的唯一标识符
*   `snapshot_identifier(str)` - 群集快照的唯一标识符
W
zh raw  
wizardforcel 已提交
1059 1060


W
wizardforcel 已提交
1061
#### RedshiftToS3Transfer
W
zh raw  
wizardforcel 已提交
1062

W
wizardforcel 已提交
1063
```py
W
wizardforcel 已提交
1064
class airflow.operators.redshift_to_s3_operator.RedshiftToS3Transferschematables3_buckets3_keyredshift_conn_id ='redshift_default'aws_conn_id ='aws_default'unload_options =(),autocommit = Falseparameters = Noneinclude_header = False* args* * kwargs 
W
zh raw  
wizardforcel 已提交
1065 1066
```

W
wizardforcel 已提交
1067
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1068

W
wizardforcel 已提交
1069
执行UNLOAD命令,将s3作为带标题的CSV
W
zh raw  
wizardforcel 已提交
1070

W
wizardforcel 已提交
1071
参数:
W
zh raw  
wizardforcel 已提交
1072

W
wizardforcel 已提交
1073 1074 1075 1076 1077 1078 1079
*   `schema(str)` - 对redshift数据库中特定模式的引用
*   `table(str)` - 对redshift数据库中特定表的引用
*   `s3_bucket(str)` - 对特定S3存储桶的引用
*   `s3_key(str)` - 对特定S3密钥的引用
*   `redshift_conn_id(str)` - 对特定redshift数据库的引用
*   `aws_conn_id(str)` - 对特定S3连接的引用
*   `unload_options(list)` - 对UNLOAD选项列表的引用
W
zh raw  
wizardforcel 已提交
1080 1081


W
wizardforcel 已提交
1082
#### S3ToRedshiftTransfer
W
zh raw  
wizardforcel 已提交
1083

W
wizardforcel 已提交
1084
```py
W
wizardforcel 已提交
1085
class airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransferschematables3_buckets3_keyredshift_conn_id ='redshift_default'aws_conn_id ='aws_default'copy_options =(),autocommit = Falseparameters = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
1086 1087
```

W
wizardforcel 已提交
1088
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1089

W
wizardforcel 已提交
1090
执行COPY命令将文件从s3加载到Redshift
W
zh raw  
wizardforcel 已提交
1091

W
wizardforcel 已提交
1092
参数:
W
zh raw  
wizardforcel 已提交
1093

W
wizardforcel 已提交
1094 1095 1096 1097 1098 1099 1100
*   `schema(str)` - 对redshift数据库中特定模式的引用
*   `table(str)` - 对redshift数据库中特定表的引用
*   `s3_bucket(str)` - 对特定S3存储桶的引用
*   `s3_key(str)` - 对特定S3密钥的引用
*   `redshift_conn_id(str)` - 对特定redshift数据库的引用
*   `aws_conn_id(str)` - 对特定S3连接的引用
*   `copy_options(list)` - 对COPY选项列表的引用
W
zh raw  
wizardforcel 已提交
1101 1102


W
wizardforcel 已提交
1103
## Databricks
W
zh raw  
wizardforcel 已提交
1104

W
wizardforcel 已提交
1105
[Databricks](https://databricks.com/)贡献了一个Airflow运算符,可以将运行提交到Databricks平台。在运营商内部与`api/2.0/jobs/runs/submit` [端点进行通信](https://docs.databricks.com/api/latest/jobs.html)
W
zh raw  
wizardforcel 已提交
1106

W
wizardforcel 已提交
1107
### DatabricksSubmitRunOperator
W
zh raw  
wizardforcel 已提交
1108

W
wizardforcel 已提交
1109
```py
W
wizardforcel 已提交
1110
class airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperatorjson = Nonespark_jar_task = Nonenotebook_task = Nonenew_cluster = Noneexisting_cluster_id = Nonelibraries = Nonerun_name = Nonetimeout_seconds = Nonedatabricks_conn_id ='databricks_default'polling_period_seconds = 30databricks_retry_limit = 3do_xcom_push = False** kwargs 
W
zh raw  
wizardforcel 已提交
1111 1112
```

W
wizardforcel 已提交
1113
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1114

W
wizardforcel 已提交
1115
使用[api / 2.0 / jobs / runs / submit](https://docs.databricks.com/api/latest/jobs.html) API端点向Databricks提交Spark作业运行。
W
zh raw  
wizardforcel 已提交
1116

W
wizardforcel 已提交
1117
有两种方法可以实例化此运算符。
W
zh raw  
wizardforcel 已提交
1118

W
wizardforcel 已提交
1119
在第一种方式,你可以把你通常用它来调用的JSON有效载荷`api/2.0/jobs/runs/submit`端点并将其直接传递到我们`DatabricksSubmitRunOperator`通过`json`参数。例如
W
zh raw  
wizardforcel 已提交
1120

W
wizardforcel 已提交
1121
```py
W
zh raw  
wizardforcel 已提交
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
 json = {
  'new_cluster' : {
    'spark_version' : '2.1.0-db3-scala2.11' ,
    'num_workers' : 2
  },
  'notebook_task' : {
    'notebook_path' : '/Users/airflow@example.com/PrepareData' ,
  },
}
notebook_run = DatabricksSubmitRunOperator ( task_id = 'notebook_run' , json = json )

```

W
wizardforcel 已提交
1135
另一种完成同样事情的方法是直接使用命名参数`DatabricksSubmitRunOperator`。请注意,`runs/submit`端点中的每个顶级参数都只有一个命名参数。在此方法中,您的代码如下所示:
W
zh raw  
wizardforcel 已提交
1136

W
wizardforcel 已提交
1137
```py
W
zh raw  
wizardforcel 已提交
1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
 new_cluster = {
  'spark_version' : '2.1.0-db3-scala2.11' ,
  'num_workers' : 2
}
notebook_task = {
  'notebook_path' : '/Users/airflow@example.com/PrepareData' ,
}
notebook_run = DatabricksSubmitRunOperator (
    task_id = 'notebook_run' ,
    new_cluster = new_cluster ,
    notebook_task = notebook_task )

```

W
wizardforcel 已提交
1152
在提供json参数**和**命名参数的情况下,它们将合并在一起。如果在合并期间存在冲突,则命名参数将优先并覆盖顶级`json`键。
W
zh raw  
wizardforcel 已提交
1153

W
wizardforcel 已提交
1154
```py
W
wizardforcel 已提交
1155
 目前DatabricksSubmitRunOperator支持的命名参数是 
W
zh raw  
wizardforcel 已提交
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165
```

*   `spark_jar_task`
*   `notebook_task`
*   `new_cluster`
*   `existing_cluster_id`
*   `libraries`
*   `run_name`
*   `timeout_seconds`

W
wizardforcel 已提交
1166
参数:
W
zh raw  
wizardforcel 已提交
1167

W
wizardforcel 已提交
1168
*   `json(dict)` -
W
zh raw  
wizardforcel 已提交
1169

W
wizardforcel 已提交
1170
    包含API参数的JSON对象,将直接传递给`api/2.0/jobs/runs/submit`端点。其他命名参数(即`spark_jar_task`,`notebook_task`..)到该运营商将与此JSON字典合并如果提供他们。如果在合并期间存在冲突,则命名参数将优先并覆盖顶级json键。(模板)
W
zh raw  
wizardforcel 已提交
1171 1172 1173

    也可以看看

W
wizardforcel 已提交
1174
    有关模板的更多信息,请参阅[Jinja模板](concepts.html)。[https://docs.databricks.com/api/latest/jobs.html#runs-submit](https://docs.databricks.com/api/latest/jobs.html)
W
zh raw  
wizardforcel 已提交
1175

W
wizardforcel 已提交
1176
*   `spark_jar_task(dict)` -
W
zh raw  
wizardforcel 已提交
1177

W
wizardforcel 已提交
1178
    JAR任务的主要类和参数。请注意,实际的JAR在`libraries`。中指定。_无论是_ `spark_jar_task` _或_ `notebook_task`应符合规定。该字段将被模板化。
W
zh raw  
wizardforcel 已提交
1179 1180 1181

    也可以看看

W
wizardforcel 已提交
1182
    [https://docs.databricks.com/api/latest/jobs.html#jobssparkjartask](https://docs.databricks.com/api/latest/jobs.html)
W
zh raw  
wizardforcel 已提交
1183

W
wizardforcel 已提交
1184
*   `notebook_task(dict)` -
W
zh raw  
wizardforcel 已提交
1185

W
wizardforcel 已提交
1186
    笔记本任务的笔记本路径和参数。_无论是_ `spark_jar_task` _或_ `notebook_task`应符合规定。该字段将被模板化。
W
zh raw  
wizardforcel 已提交
1187 1188 1189

    也可以看看

W
wizardforcel 已提交
1190
    [https://docs.databricks.com/api/latest/jobs.html#jobsnotebooktask](https://docs.databricks.com/api/latest/jobs.html)
W
zh raw  
wizardforcel 已提交
1191

W
wizardforcel 已提交
1192
*   `new_cluster(dict)` -
W
zh raw  
wizardforcel 已提交
1193

W
wizardforcel 已提交
1194
    将在其上运行此任务的新群集的规范。_无论是_ `new_cluster` _或_ `existing_cluster_id`应符合规定。该字段将被模板化。
W
zh raw  
wizardforcel 已提交
1195 1196 1197

    也可以看看

W
wizardforcel 已提交
1198
    [https://docs.databricks.com/api/latest/jobs.html#jobsclusterspecnewcluster](https://docs.databricks.com/api/latest/jobs.html)
W
zh raw  
wizardforcel 已提交
1199

W
wizardforcel 已提交
1200 1201
*   `existing_cluster_id(str)` - 要运行此任务的现有集群的ID。_无论是_ `new_cluster` _或_ `existing_cluster_id`应符合规定。该字段将被模板化。
*   `图书馆(list[dict])` -
W
zh raw  
wizardforcel 已提交
1202

W
wizardforcel 已提交
1203
    这个运行的库将使用。该字段将被模板化。
W
zh raw  
wizardforcel 已提交
1204 1205 1206

    也可以看看

W
wizardforcel 已提交
1207
    [https://docs.databricks.com/api/latest/libraries.html#managedlibrarieslibrary](https://docs.databricks.com/api/latest/libraries.html)
W
zh raw  
wizardforcel 已提交
1208

W
wizardforcel 已提交
1209 1210 1211 1212 1213 1214
*   `run_name(str)` - 用于此任务的运行名称。默认情况下,这将设置为Airflow `task_id`。这`task_id`是超类的必需参数`BaseOperator`。该字段将被模板化。
*   `timeout_seconds(int32)` - 此次运行的超时。默认情况下,使用值0表示没有超时。该字段将被模板化。
*   `databricks_conn_id(str)` - 要使用的Airflow连接的名称。默认情况下,在常见情况下,这将是`databricks_default`。要使用基于令牌的身份验证,请`token`在连接的额外字段中提供密钥。
*   `polling_period_seconds(int)` - 控制我们轮询此运行结果的速率。默认情况下,操作员每30秒轮询一次。
*   `databricks_retry_limit(int)` - 如果Databricks后端无法访问,则重试的次数。其值必须大于或等于1。
*   `do_xcom_push(bool)` - 我们是否应该将run_id和run_page_url推送到xcom。
W
zh raw  
wizardforcel 已提交
1215 1216


W
wizardforcel 已提交
1217
## GCP:Google云端平台
W
zh raw  
wizardforcel 已提交
1218

W
wizardforcel 已提交
1219
Airflow广泛支持Google Cloud Platform。但请注意,大多数Hooks和Operators都在contrib部分。这意味着他们具有_beta_状态,这意味着他们可以在次要版本之间进行重大更改。
W
zh raw  
wizardforcel 已提交
1220

W
wizardforcel 已提交
1221
请参阅[GCP连接类型](howto/manage-connections.html)文档以配置与GCP的连接。
W
zh raw  
wizardforcel 已提交
1222 1223 1224

### 记录

W
wizardforcel 已提交
1225
可以将Airflow配置为在Google云端存储中读取和写入任务日志。请参阅[将日志写入Google云端存储](howto/write-logs.html)
W
zh raw  
wizardforcel 已提交
1226 1227 1228

### BigQuery的

W
wizardforcel 已提交
1229
#### BigQuery运算符
W
zh raw  
wizardforcel 已提交
1230

W
wizardforcel 已提交
1231 1232 1233 1234 1235 1236 1237 1238 1239
*   [BigQueryCheckOperator](28):对SQL查询执行检查,该查询将返回具有不同值的单行。
*   [BigQueryValueCheckOperator](28):使用SQL代码执行简单的值检查。
*   [BigQueryIntervalCheckOperator](28):检查作为SQL表达式给出的度量值是否在days_back之前的某个容差范围内。
*   [BigQueryCreateEmptyTableOperator](28):在指定的BigQuery数据集中创建一个新的空表,可选择使用模式。
*   [BigQueryCreateExternalTableOperator](28):使用Google Cloud Storage中的数据在数据集中创建新的外部表。
*   [BigQueryDeleteDatasetOperator](28):删除现有的BigQuery数据集。
*   [BigQueryOperator](28):在特定的BigQuery数据库中执行BigQuery SQL查询。
*   [BigQueryToBigQueryOperator](28):将BigQuery表复制到另一个BigQuery表。
*   [BigQueryToCloudStorageOperator](28):将BigQuery表传输到Google Cloud Storage存储桶
W
zh raw  
wizardforcel 已提交
1240

W
wizardforcel 已提交
1241
##### BigQueryCheckOperator
W
zh raw  
wizardforcel 已提交
1242

W
wizardforcel 已提交
1243
```py
W
wizardforcel 已提交
1244
class airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperatorsqlbigquery_conn_id ='bigquery_default'* args** kwargs 
W
zh raw  
wizardforcel 已提交
1245 1246
```

W
wizardforcel 已提交
1247
基类: [`airflow.operators.check_operator.CheckOperator`](code.html "airflow.operators.check_operator.CheckOperator")
W
zh raw  
wizardforcel 已提交
1248

W
wizardforcel 已提交
1249
对BigQuery执行检查。该`BigQueryCheckOperator`预期的SQL查询将返回一行。使用python `bool`强制转换评估第一行的每个值。如果任何值返回,`False`则检查失败并输出错误。
W
zh raw  
wizardforcel 已提交
1250

W
wizardforcel 已提交
1251
请注意,Python bool强制转换如下`False`
W
zh raw  
wizardforcel 已提交
1252 1253 1254

*   `False`
*   `0`
W
wizardforcel 已提交
1255 1256 1257
*   空字符串(`""`
*   空列表(`[]`
*   空字典或集(`{}`
W
zh raw  
wizardforcel 已提交
1258

W
wizardforcel 已提交
1259
给定一个查询,它只会在计数时失败。您可以制作更复杂的查询,例如,可以检查表与上游源表的行数相同,或者今天的分区计数大于昨天的分区,或者一组指标是否更少7天平均值超过3个标准差。`SELECT COUNT(*) FROM foo``== 0`
W
zh raw  
wizardforcel 已提交
1260

W
wizardforcel 已提交
1261
此运算符可用作管道中的数据质量检查,并且根据您在DAG中的位置,您可以选择停止关键路径,防止发布可疑数据,或者在旁边接收电子邮件替代品阻止DAG的进展。
W
zh raw  
wizardforcel 已提交
1262

W
wizardforcel 已提交
1263
参数:
W
zh raw  
wizardforcel 已提交
1264

W
wizardforcel 已提交
1265 1266
*   `sql(str)` - 要执行的sql
*   `bigquery_conn_id(str)` - 对BigQuery数据库的引用
W
zh raw  
wizardforcel 已提交
1267 1268


W
wizardforcel 已提交
1269
##### BigQueryValueCheckOperator
W
zh raw  
wizardforcel 已提交
1270

W
wizardforcel 已提交
1271
```py
W
wizardforcel 已提交
1272
class airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperatorsqlpass_valuetolerance = Nonebigquery_conn_id ='bigquery_default'* args** kwargs 
W
zh raw  
wizardforcel 已提交
1273 1274
```

W
wizardforcel 已提交
1275 1276 1277
基类: [`airflow.operators.check_operator.ValueCheckOperator`](code.html "airflow.operators.check_operator.ValueCheckOperator")

使用sql代码执行简单的值检查。
W
zh raw  
wizardforcel 已提交
1278

W
wizardforcel 已提交
1279
参数:`sql(str)` - 要执行的sql 
W
zh raw  
wizardforcel 已提交
1280 1281


W
wizardforcel 已提交
1282
##### BigQueryIntervalCheckOperator
W
zh raw  
wizardforcel 已提交
1283

W
wizardforcel 已提交
1284
```py
W
wizardforcel 已提交
1285
class airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperatortablemetrics_thresholdsdate_filter_column ='ds'days_back = -7bigquery_conn_id ='bigquery_default'* args** kwargs 
W
zh raw  
wizardforcel 已提交
1286 1287
```

W
wizardforcel 已提交
1288
基类: [`airflow.operators.check_operator.IntervalCheckOperator`](code.html "airflow.operators.check_operator.IntervalCheckOperator")
W
zh raw  
wizardforcel 已提交
1289

W
wizardforcel 已提交
1290
检查作为SQL表达式给出的度量值是否在days_back之前的某个容差范围内。
W
zh raw  
wizardforcel 已提交
1291

W
wizardforcel 已提交
1292
此方法构造一个类似的查询
W
zh raw  
wizardforcel 已提交
1293

W
wizardforcel 已提交
1294
```py
W
wizardforcel 已提交
1295
 SELECT { metrics_thresholddictkey } FROM { table }
W
zh raw  
wizardforcel 已提交
1296 1297 1298 1299
    WHERE { date_filter_column } =< date >

```

W
wizardforcel 已提交
1300
参数:
W
zh raw  
wizardforcel 已提交
1301

W
wizardforcel 已提交
1302 1303 1304
*   `table(str)` - 表名
*   `days_back(int)` - ds与我们要检查的ds之间的天数。默认为7天
*   `metrics_threshold(dict)` - 由指标索引的比率字典,例如'COUNT(*)':1.5将需要当前日和之前的days_back之间50%或更小的差异。
W
zh raw  
wizardforcel 已提交
1305 1306


W
wizardforcel 已提交
1307
##### BigQueryGetDataOperator
W
zh raw  
wizardforcel 已提交
1308

W
wizardforcel 已提交
1309
```py
W
wizardforcel 已提交
1310
class airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperatordataset_idtable_idmax_results ='100'selected_fields = Nonebigquery_conn_id ='bigquery_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
1311 1312
```

W
wizardforcel 已提交
1313
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1314

W
wizardforcel 已提交
1315
从BigQuery表中获取数据(或者为所选列获取数据)并在python列表中返回数据。返回列表中的元素数将等于获取的行数。列表中的每个元素将再次是一个列表,其中元素将表示该行的列值。
W
zh raw  
wizardforcel 已提交
1316

W
wizardforcel 已提交
1317
**结果示例**`[['Tony', '10'], ['Mike', '20'], ['Steve', '15']]`
W
zh raw  
wizardforcel 已提交
1318 1319 1320

注意

W
wizardforcel 已提交
1321
如果传递的字段`selected_fields`的顺序与BQ表中已有的列的顺序不同,则数据仍将按BQ表的顺序排列。例如,如果BQ表有3列,`[A,B,C]`并且您传递'B,`selected_fields`那么数据中的A' 仍然是表格`'A,B'`
W
zh raw  
wizardforcel 已提交
1322 1323 1324

**示例**

W
wizardforcel 已提交
1325
```py
W
zh raw  
wizardforcel 已提交
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
 get_data = BigQueryGetDataOperator (
    task_id = 'get_data_from_bq' ,
    dataset_id = 'test_dataset' ,
    table_id = 'Transaction_partitions' ,
    max_results = '100' ,
    selected_fields = 'DATE' ,
    bigquery_conn_id = 'airflow-service-account'
)

```

W
wizardforcel 已提交
1337
参数:
W
zh raw  
wizardforcel 已提交
1338

W
wizardforcel 已提交
1339
*   **dataset_id** - 请求的表的数据集ID。(模板)
W
wizardforcel 已提交
1340 1341 1342 1343 1344
*   `table_id(str)` - 请求表的表ID。(模板)
*   `max_results(str)` - 从表中获取的最大记录数(行数)。(模板)
*   `selected_fields(str)` - 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。
*   `bigquery_conn_id(str)` - 对特定BigQuery钩子的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
1345 1346


W
wizardforcel 已提交
1347
##### BigQueryCreateEmptyTableOperator
W
zh raw  
wizardforcel 已提交
1348

W
wizardforcel 已提交
1349
```py
W
wizardforcel 已提交
1350
class airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperatordataset_idtable_idproject_id = Noneschema_fields = Nonegcs_schema_object = Nonetime_partitioning = {}bigquery_conn_id ='bigquery_default'google_cloud_storage_conn_id ='google_cloud_default'delegate_to = None* args ** kwargs 
W
zh raw  
wizardforcel 已提交
1351 1352
```

W
wizardforcel 已提交
1353
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1354

W
wizardforcel 已提交
1355
在指定的BigQuery数据集中创建一个新的空表,可选择使用模式。
W
zh raw  
wizardforcel 已提交
1356

W
wizardforcel 已提交
1357
可以用两种方法之一指定用于BigQuery表的模式。您可以直接传递架构字段,也可以将运营商指向Google云存储对象名称。Google云存储中的对象必须是包含架构字段的JSON文件。您还可以创建没有架构的表。
W
zh raw  
wizardforcel 已提交
1358

W
wizardforcel 已提交
1359
参数:
W
zh raw  
wizardforcel 已提交
1360

W
wizardforcel 已提交
1361 1362 1363 1364
*   `project_id(str)` - 将表创建的项目。(模板)
*   `dataset_id(str)` - 用于创建表的数据集。(模板)
*   `table_id(str)` - 要创建的表的名称。(模板)
*   `schema_fields(list)` -
W
zh raw  
wizardforcel 已提交
1365

W
wizardforcel 已提交
1366
    如果设置,则此处定义的架构字段列表:[https](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs):[//cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs)
W
zh raw  
wizardforcel 已提交
1367 1368 1369

    **示例** :

W
wizardforcel 已提交
1370
    ```py
W
zh raw  
wizardforcel 已提交
1371 1372 1373 1374 1375
     schema_fields = [{ "name" : "emp_name" , "type" : "STRING" , "mode" : "REQUIRED" },
                   { "name" : "salary" , "type" : "INTEGER" , "mode" : "NULLABLE" }]

    ```

W
wizardforcel 已提交
1376 1377
*   `gcs_schema_object(str)` - 包含模式(模板化)的JSON文件的完整路径。例如:`gs://test-bucket/dir1/dir2/employee_schema.json`
*   `time_partitioning(dict)` -
W
zh raw  
wizardforcel 已提交
1378

W
wizardforcel 已提交
1379
    配置可选的时间分区字段,即按API规范按字段,类型和到期分区。
W
zh raw  
wizardforcel 已提交
1380 1381 1382

    也可以看看

W
wizardforcel 已提交
1383
    [https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables)
W
zh raw  
wizardforcel 已提交
1384

W
wizardforcel 已提交
1385 1386 1387
*   `bigquery_conn_id(str)` - 对特定BigQuery挂钩的引用。
*   `google_cloud_storage_conn_id(str)` - 对特定Google云存储挂钩的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
1388 1389


W
wizardforcel 已提交
1390
**示例(在GCS中使用模式JSON)**
W
zh raw  
wizardforcel 已提交
1391

W
wizardforcel 已提交
1392
```py
W
zh raw  
wizardforcel 已提交
1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404
 CreateTable = BigQueryCreateEmptyTableOperator (
    task_id = 'BigQueryCreateEmptyTableOperator_task' ,
    dataset_id = 'ODS' ,
    table_id = 'Employees' ,
    project_id = 'internal-gcp-project' ,
    gcs_schema_object = 'gs://schema-bucket/employee_schema.json' ,
    bigquery_conn_id = 'airflow-service-account' ,
    google_cloud_storage_conn_id = 'airflow-service-account'
)

```

W
wizardforcel 已提交
1405
**对应的Schema文件**`employee_schema.json`):
W
zh raw  
wizardforcel 已提交
1406

W
wizardforcel 已提交
1407
```py
W
zh raw  
wizardforcel 已提交
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
 [
  {
    "mode" : "NULLABLE" ,
    "name" : "emp_name" ,
    "type" : "STRING"
  },
  {
    "mode" : "REQUIRED" ,
    "name" : "salary" ,
    "type" : "INTEGER"
  }
]

```

W
wizardforcel 已提交
1423
**示例(在DAG中使用模式)**
W
zh raw  
wizardforcel 已提交
1424

W
wizardforcel 已提交
1425
```py
W
zh raw  
wizardforcel 已提交
1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438
 CreateTable = BigQueryCreateEmptyTableOperator (
    task_id = 'BigQueryCreateEmptyTableOperator_task' ,
    dataset_id = 'ODS' ,
    table_id = 'Employees' ,
    project_id = 'internal-gcp-project' ,
    schema_fields = [{ "name" : "emp_name" , "type" : "STRING" , "mode" : "REQUIRED" },
                   { "name" : "salary" , "type" : "INTEGER" , "mode" : "NULLABLE" }],
    bigquery_conn_id = 'airflow-service-account' ,
    google_cloud_storage_conn_id = 'airflow-service-account'
)

```

W
wizardforcel 已提交
1439
##### BigQueryCreateExternalTableOperator
W
zh raw  
wizardforcel 已提交
1440

W
wizardforcel 已提交
1441
```py
W
wizardforcel 已提交
1442
class airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperatorbucketsource_objectsdestination_project_dataset_tableschema_fields = Noneschema_object = Nonesource_format ='CSV'compression ='NONE'skip_leading_rows = 0field_delimiter =','max_bad_records = 0 quote_character = Noneallow_quoted_newlines = Falseallow_jagged_rows = Falsebigquery_conn_id ='bigquery_default'google_cloud_storage_conn_id ='google_cloud_default'delegate_to = Nonesrc_fmt_configs = {}* args** kwargs 
W
zh raw  
wizardforcel 已提交
1443 1444
```

W
wizardforcel 已提交
1445
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1446

W
wizardforcel 已提交
1447
使用Google云端存储中的数据在数据集中创建新的外部表。
W
zh raw  
wizardforcel 已提交
1448

W
wizardforcel 已提交
1449
可以用两种方法之一指定用于BigQuery表的模式。您可以直接传递架构字段,也可以将运营商指向Google云存储对象名称。Google云存储中的对象必须是包含架构字段的JSON文件。
W
zh raw  
wizardforcel 已提交
1450

W
wizardforcel 已提交
1451
参数:
W
zh raw  
wizardforcel 已提交
1452

W
wizardforcel 已提交
1453
*   `bucket(str)` - 指向外部表的存储桶。(模板)
W
wizardforcel 已提交
1454
*   **source_objects** - 指向表格的Google云存储URI列表。(模板化)如果source_format是'DATASTORE_BACKUP',则列表必须只包含一个URI。
W
wizardforcel 已提交
1455 1456
*   `destination_project_dataset_table(str)` - 用于将数据加载到(模板化)的虚线(&lt;project&gt;。)&lt;dataset&gt;&lt;table&gt; BigQuery表。如果未包含&lt;project&gt;,则项目将是连接json中定义的项目。
*   `schema_fields(list)` -
W
zh raw  
wizardforcel 已提交
1457

W
wizardforcel 已提交
1458
    如果设置,则此处定义的架构字段列表:[https](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs):[//cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs)
W
zh raw  
wizardforcel 已提交
1459 1460 1461

    **示例** :

W
wizardforcel 已提交
1462
    ```py
W
zh raw  
wizardforcel 已提交
1463 1464 1465 1466 1467
     schema_fields = [{ "name" : "emp_name" , "type" : "STRING" , "mode" : "REQUIRED" },
                   { "name" : "salary" , "type" : "INTEGER" , "mode" : "NULLABLE" }]

    ```

W
wizardforcel 已提交
1468
    当source_format为'DATASTORE_BACKUP'时,不应设置。
W
zh raw  
wizardforcel 已提交
1469

W
wizardforcel 已提交
1470 1471
*   **schema_object** - 如果设置,则指向包含表的架构的.json文件的GCS对象路径。(模板)
*   **schema_object** - 字符串
W
wizardforcel 已提交
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
*   `source_format(str)` - 数据的文件格式。
*   `compression(str)` - [可选]数据源的压缩类型。可能的值包括GZIP和NONE。默认值为NONE。Google Cloud Bigtable,Google Cloud Datastore备份和Avro格式会忽略此设置。
*   `skip_leading_rows(int)` - 从CSV加载时要跳过的行数。
*   `field_delimiter(str)` - 用于CSV的分隔符。
*   `max_bad_records(int)` - BigQuery在运行作业时可以忽略的最大错误记录数。
*   `quote_character(str)` - 用于引用CSV文件中数据部分的值。
*   `allow_quoted_newlines(bool)` - 是否允许引用的换行符(true)或不允许(false)。
*   `allow_jagged_rows(bool)` - 接受缺少尾随可选列的行。缺失值被视为空值。如果为false,则缺少尾随列的记录将被视为错误记录,如果错误记录太多,则会在作业结果中返回无效错误。仅适用于CSV,忽略其他格式。
*   `bigquery_conn_id(str)` - 对特定BigQuery挂钩的引用。
*   `google_cloud_storage_conn_id(str)` - 对特定Google云存储挂钩的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `src_fmt_configs(dict)` - 配置特定于源格式的可选字段
W
zh raw  
wizardforcel 已提交
1484 1485


W
wizardforcel 已提交
1486
##### BigQueryDeleteDatasetOperator
W
zh raw  
wizardforcel 已提交
1487

W
wizardforcel 已提交
1488
##### BigQueryOperator
W
zh raw  
wizardforcel 已提交
1489

W
wizardforcel 已提交
1490
```py
W
wizardforcel 已提交
1491
class airflow.contrib.operators.bigquery_operator.BigQueryOperatorbql = Nonesql = Nonedestination_dataset_table = Falsewrite_disposition ='WRITE_EMPTY'allow_large_results = Falseflatten_results = Falsebigquery_conn_id ='bigquery_default'delegate_to = Noneudf_config = False use_legacy_sql = Truemaximum_billing_tier = Nonemaximumbytesbilled = Nonecreate_disposition ='CREATE_IF_NEEDED'schema_update_options =(),query_params = Nonepriority ='INTERACTIVE'time_partitioning = {}* args** kwargs 
W
zh raw  
wizardforcel 已提交
1492 1493
```

W
wizardforcel 已提交
1494
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1495

W
wizardforcel 已提交
1496
在特定的BigQuery数据库中执行BigQuery SQL查询
W
zh raw  
wizardforcel 已提交
1497

W
wizardforcel 已提交
1498
参数:
W
zh raw  
wizardforcel 已提交
1499

W
wizardforcel 已提交
1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
*   `BQL(可接收表示SQL语句中的海峡,海峡列表(SQL语句),或参照模板文件模板引用在“.SQL”结束海峡认可。)` - (不推荐使用。`SQL`参数代替)要执行的sql代码(模板化)
*   `SQL(可接收表示SQL语句中的海峡,海峡列表(SQL语句),或参照模板文件模板引用在“.SQL”结束海峡认可。)` - SQL代码被执行(模板)
*   `destination_dataset_table(str)` - 一个虚线(&lt;project&gt;&#124; &lt;project&gt;:)&lt;dataset&gt;&lt;table&gt;,如果设置,将存储查询结果。(模板)
*   `write_disposition(str)` - 指定目标表已存在时发生的操作。(默认:'WRITE_EMPTY')
*   `create_disposition(str)` - 指定是否允许作业创建新表。(默认值:'CREATE_IF_NEEDED')
*   `allow_large_results(bool)` - 是否允许大结果。
*   `flatten_results(bool)` - 如果为true且查询使用旧版SQL方言,则展平查询结果中的所有嵌套和重复字段。`allow_large_results`必须是`true`如果设置为`false`。对于标准SQL查询,将忽略此标志,并且结果永远不会展平。
*   `bigquery_conn_id(str)` - 对特定BigQuery钩子的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `udf_config(list)` - 查询的用户定义函数配置。有关详细信息,请参阅[https://cloud.google.com/bigquery/user-defined-functions](https://cloud.google.com/bigquery/user-defined-functions)
*   `use_legacy_sql(bool)` - 是使用旧SQL(true)还是标准SQL(false)。
*   `maximum_billing_tier(int)` - 用作基本价格乘数的正整数。默认为None,在这种情况下,它使用项目中设置的值。
*   `maximumbytesbilled(float)` - 限制为此作业计费的字节数。超出此限制的字节数的查询将失败(不会产生费用)。如果未指定,则将其设置为项目默认值。
*   `schema_update_options(tuple)` - 允许更新目标表的模式作为加载作业的副作用。
*   `query_params(dict)` - 包含查询参数类型和值的字典,传递给BigQuery。
*   `priority(str)` - 指定查询的优先级。可能的值包括INTERACTIVE和BATCH。默认值为INTERACTIVE。
*   `time_partitioning(dict)` - 配置可选的时间分区字段,即按API规范按字段,类型和到期分区。请注意,'field'不能与dataset.table $ partition一起使用。
W
zh raw  
wizardforcel 已提交
1517 1518


W
wizardforcel 已提交
1519
##### BigQueryTableDeleteOperator
W
zh raw  
wizardforcel 已提交
1520

W
wizardforcel 已提交
1521
```py
W
wizardforcel 已提交
1522
class airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperatordeletion_dataset_tablebigquery_conn_id ='bigquery_default'delegate_to = Noneignore_if_missing = False* args** kwargs 
W
zh raw  
wizardforcel 已提交
1523 1524
```

W
wizardforcel 已提交
1525
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1526

W
wizardforcel 已提交
1527
删除BigQuery表
W
zh raw  
wizardforcel 已提交
1528

W
wizardforcel 已提交
1529
参数:
W
zh raw  
wizardforcel 已提交
1530

W
wizardforcel 已提交
1531 1532 1533 1534
*   `deletion_dataset_table(str)` - 一个虚线(&lt;project&gt;&#124; &lt;project&gt;:)&lt;dataset&gt;&lt;table&gt;,指示将删除哪个表。(模板)
*   `bigquery_conn_id(str)` - 对特定BigQuery钩子的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `ignore_if_missing(bool)` - 如果为True,则即使请求的表不存在也返回成功。
W
zh raw  
wizardforcel 已提交
1535 1536


W
wizardforcel 已提交
1537
##### BigQueryToBigQueryOperator
W
zh raw  
wizardforcel 已提交
1538

W
wizardforcel 已提交
1539
```py
W
wizardforcel 已提交
1540
class airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperatorsource_project_dataset_tablesdestination_project_dataset_tablewrite_disposition ='WRITE_EMPTY'create_disposition ='CREATE_IF_NEEDED'bigquery_conn_id ='bigquery_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
1541 1542
```

W
wizardforcel 已提交
1543
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1544

W
wizardforcel 已提交
1545
将数据从一个BigQuery表复制到另一个。
W
zh raw  
wizardforcel 已提交
1546 1547 1548

也可以看看

W
wizardforcel 已提交
1549
有关这些参数的详细信息,请访问:[https](https://cloud.google.com/bigquery/docs/reference/v2/jobs)[//cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy](https://cloud.google.com/bigquery/docs/reference/v2/jobs)
W
zh raw  
wizardforcel 已提交
1550

W
wizardforcel 已提交
1551
参数:
W
zh raw  
wizardforcel 已提交
1552

W
wizardforcel 已提交
1553 1554 1555 1556 1557 1558
*   `source_project_dataset_tables(list[str])` - 一个或多个点(项目:[&#124;](28)项目。)&lt;dataset&gt;&lt;table&gt;用作源数据的BigQuery表。如果未包含&lt;project&gt;,则项目将是连接json中定义的项目。如果有多个源表,请使用列表。(模板)
*   `destination_project_dataset_table(str)` - 目标BigQuery表。格式为:(project:[&#124;](28) project。)&lt;dataset&gt;&lt;table&gt;(模板化)
*   `write_disposition(str)` - 表已存在时的写处置。
*   `create_disposition(str)` - 如果表不存在,则创建处置。
*   `bigquery_conn_id(str)` - 对特定BigQuery钩子的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
1559 1560


W
wizardforcel 已提交
1561
##### BigQueryToCloudStorageOperator
W
zh raw  
wizardforcel 已提交
1562

W
wizardforcel 已提交
1563
```py
W
wizardforcel 已提交
1564
class airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperatorsource_project_dataset_tabledestination_cloud_storage_uriscompression ='NONE'export_format ='CSV'field_delimiter =','print_header = Truebigquery_conn_id ='bigquery_default'delegate_to = None* args ** kwargs 
W
zh raw  
wizardforcel 已提交
1565 1566
```

W
wizardforcel 已提交
1567
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1568

W
wizardforcel 已提交
1569
将BigQuery表传输到Google Cloud Storage存储桶。
W
zh raw  
wizardforcel 已提交
1570 1571 1572

也可以看看

W
wizardforcel 已提交
1573
有关这些参数的详细信息,请访问:[https](https://cloud.google.com/bigquery/docs/reference/v2/jobs)[//cloud.google.com/bigquery/docs/reference/v2/jobs](https://cloud.google.com/bigquery/docs/reference/v2/jobs)
W
zh raw  
wizardforcel 已提交
1574

W
wizardforcel 已提交
1575
参数:
W
zh raw  
wizardforcel 已提交
1576

W
wizardforcel 已提交
1577 1578 1579
*   `source_project_dataset_table(str)` - 用作源数据的虚线(&lt;project&gt;&#124; &lt;project&gt;:)&lt;dataset&gt;&lt;table&gt; BigQuery表。如果未包含&lt;project&gt;,则项目将是连接json中定义的项目。(模板)
*   `destination_cloud_storage_uris(list)` - 目标Google云端存储URI(例如gs://some-bucket/some-file.txt)。(模板化)遵循此处定义的惯例:https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
*   `compression(str)` - 要使用的压缩类型。
W
wizardforcel 已提交
1580
*   **export_format** - 要导出的文件格式。
W
wizardforcel 已提交
1581 1582 1583 1584
*   `field_delimiter(str)` - 提取到CSV时使用的分隔符。
*   `print_header(bool)` - 是否打印CSV文件提取的标头。
*   `bigquery_conn_id(str)` - 对特定BigQuery钩子的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
1585 1586


W
wizardforcel 已提交
1587
#### BigQueryHook
W
zh raw  
wizardforcel 已提交
1588

W
wizardforcel 已提交
1589
```py
W
wizardforcel 已提交
1590
class airflow.contrib.hooks.bigquery_hook.BigQueryHookbigquery_conn_id ='bigquery_default'delegate_to = Noneuse_legacy_sql = True 
W
zh raw  
wizardforcel 已提交
1591 1592
```

W
wizardforcel 已提交
1593
基类:[`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`](code.html "airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook")[`airflow.hooks.dbapi_hook.DbApiHook`](code.html "airflow.hooks.dbapi_hook.DbApiHook")`airflow.utils.log.logging_mixin.LoggingMixin`
W
zh raw  
wizardforcel 已提交
1594

W
wizardforcel 已提交
1595
与BigQuery交互。此挂钩使用Google Cloud Platform连接。
W
zh raw  
wizardforcel 已提交
1596

W
wizardforcel 已提交
1597
```py
W
wizardforcel 已提交
1598
get_conn() 
W
zh raw  
wizardforcel 已提交
1599 1600
```

W
wizardforcel 已提交
1601
返回BigQuery PEP 249连接对象。
W
zh raw  
wizardforcel 已提交
1602

W
wizardforcel 已提交
1603
```py
W
wizardforcel 已提交
1604
get_pandas_dfsqlparameters = Nonedialect = None 
W
zh raw  
wizardforcel 已提交
1605 1606
```

W
wizardforcel 已提交
1607
返回BigQuery查询生成的结果的Pandas DataFrame。必须重写DbApiHook方法,因为Pandas不支持PEP 249连接,但SQLite除外。看到:
W
zh raw  
wizardforcel 已提交
1608

W
wizardforcel 已提交
1609
[https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 ](https://github.com/pydata/pandas/blob/master/pandas/io/sql.py)[https://github.com/pydata/pandas/issues/6900](https://github.com/pydata/pandas/issues/6900)
W
zh raw  
wizardforcel 已提交
1610

W
wizardforcel 已提交
1611
参数:
W
zh raw  
wizardforcel 已提交
1612

W
wizardforcel 已提交
1613 1614 1615
*   `sql(str)` - 要执行的BigQuery SQL。
*   `参数(map 或 iterable)` - 用于呈现SQL查询的参数(未使用,请保留覆盖超类方法)
*   `dialect({'legacy', 'standard'})` - BigQuery SQL的方言 - 遗留SQL或标准SQL默认使用`self.use_legacy_sql(`如果未指定)
W
zh raw  
wizardforcel 已提交
1616 1617


W
wizardforcel 已提交
1618
```py
W
wizardforcel 已提交
1619
get_service() 
W
zh raw  
wizardforcel 已提交
1620 1621
```

W
wizardforcel 已提交
1622
返回一个BigQuery服务对象。
W
zh raw  
wizardforcel 已提交
1623

W
wizardforcel 已提交
1624
```py
W
wizardforcel 已提交
1625
insert_rowstablerowstarget_fields = Nonecommit_every = 1000 
W
zh raw  
wizardforcel 已提交
1626 1627
```

W
wizardforcel 已提交
1628
目前不支持插入。从理论上讲,您可以使用BigQuery的流API将行插入表中,但这尚未实现。
W
zh raw  
wizardforcel 已提交
1629

W
wizardforcel 已提交
1630
```py
W
wizardforcel 已提交
1631
table_existsproject_iddataset_idtable_id 
W
zh raw  
wizardforcel 已提交
1632 1633
```

W
wizardforcel 已提交
1634
检查Google BigQuery中是否存在表格。
W
zh raw  
wizardforcel 已提交
1635

W
wizardforcel 已提交
1636
参数:
W
zh raw  
wizardforcel 已提交
1637

W
wizardforcel 已提交
1638 1639 1640
*   `project_id(str)` - 要在其中查找表的Google云项目。提供给钩子的连接必须提供对指定项目的访问。
*   `dataset_id(str)` - 要在其中查找表的数据集的名称。
*   `table_id(str)` - 要检查的表的名称。
W
zh raw  
wizardforcel 已提交
1641 1642


W
wizardforcel 已提交
1643
### 云DataFlow
W
zh raw  
wizardforcel 已提交
1644

W
wizardforcel 已提交
1645
#### DataFlow运算符
W
zh raw  
wizardforcel 已提交
1646

W
wizardforcel 已提交
1647 1648 1649
*   [DataFlowJavaOperator](28):启动用Java编写的Cloud Dataflow作业。
*   [DataflowTemplateOperator](28):启动模板化的Cloud DataFlow批处理作业。
*   [DataFlowPythonOperator](28):启动用python编写的Cloud Dataflow作业。
W
zh raw  
wizardforcel 已提交
1650

W
wizardforcel 已提交
1651
##### DataFlowJavaOperator
W
zh raw  
wizardforcel 已提交
1652

W
wizardforcel 已提交
1653
```py
W
wizardforcel 已提交
1654
class airflow.contrib.operators.dataflow_operator.DataFlowJavaOperatorjardataflow_default_options = Noneoptions = Nonegcp_conn_id ='google_cloud_default'delegate_to = Nonepoll_sleep = 10job_class = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
1655 1656
```

W
wizardforcel 已提交
1657
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1658

W
wizardforcel 已提交
1659
启动Java Cloud DataFlow批处理作业。操作的参数将传递给作业。
W
zh raw  
wizardforcel 已提交
1660

W
wizardforcel 已提交
1661
在dag的default_args中定义dataflow_ *参数是一个很好的做法,例如项目,区域和分段位置。
W
zh raw  
wizardforcel 已提交
1662

W
wizardforcel 已提交
1663
```py
W
zh raw  
wizardforcel 已提交
1664 1665 1666 1667 1668 1669 1670 1671 1672 1673
 default_args = {
    'dataflow_default_options' : {
        'project' : 'my-gcp-project' ,
        'zone' : 'europe-west1-d' ,
        'stagingLocation' : 'gs://my-staging-bucket/staging/'
    }
}

```

W
wizardforcel 已提交
1674
您需要使用`jar`参数将路径作为文件引用传递给数据流,jar需要是一个自动执行的jar(请参阅以下文档:[https](https://beam.apache.org/documentation/runners/dataflow/)[//beam.apache.org/documentation/runners/dataflow/#self-执行jar](https://beam.apache.org/documentation/runners/dataflow/)。使用`options`转嫁选项你的工作。
W
zh raw  
wizardforcel 已提交
1675

W
wizardforcel 已提交
1676
```py
W
zh raw  
wizardforcel 已提交
1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691
 t1 = DataFlowOperation (
    task_id = 'datapflow_example' ,
    jar = '{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar' ,
    options = {
        'autoscalingAlgorithm' : 'BASIC' ,
        'maxNumWorkers' : '50' ,
        'start' : '{{ds}}' ,
        'partitionType' : 'DAY' ,
        'labels' : { 'foo' : 'bar' }
    },
    gcp_conn_id = 'gcp-airflow-service-account' ,
    dag = my - dag )

```

W
wizardforcel 已提交
1692
这两个`jar``options`模板化,所以你可以在其中使用变量。
W
zh raw  
wizardforcel 已提交
1693

W
wizardforcel 已提交
1694
```py
W
zh raw  
wizardforcel 已提交
1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728
 default_args = {
    'owner' : 'airflow' ,
    'depends_on_past' : False ,
    'start_date' :
        ( 2016 , 8 , 1 ),
    'email' : [ 'alex@vanboxel.be' ],
    'email_on_failure' : False ,
    'email_on_retry' : False ,
    'retries' : 1 ,
    'retry_delay' : timedelta ( minutes = 30 ),
    'dataflow_default_options' : {
        'project' : 'my-gcp-project' ,
        'zone' : 'us-central1-f' ,
        'stagingLocation' : 'gs://bucket/tmp/dataflow/staging/' ,
    }
}

dag = DAG ( 'test-dag' , default_args = default_args )

task = DataFlowJavaOperator (
    gcp_conn_id = 'gcp_default' ,
    task_id = 'normalize-cal' ,
    jar = '{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar' ,
    options = {
        'autoscalingAlgorithm' : 'BASIC' ,
        'maxNumWorkers' : '50' ,
        'start' : '{{ds}}' ,
        'partitionType' : 'DAY'

    },
    dag = dag )

```

W
wizardforcel 已提交
1729
##### DataflowTemplateOperator
W
zh raw  
wizardforcel 已提交
1730

W
wizardforcel 已提交
1731
```py
W
wizardforcel 已提交
1732
class airflow.contrib.operators.dataflow_operator.DataflowTemplateOperatortemplatedataflow_default_options = Noneparameters = Nonegcp_conn_id ='google_cloud_default'delegate_to = Nonepoll_sleep = 10* args** kwargs 
W
zh raw  
wizardforcel 已提交
1733 1734
```

W
wizardforcel 已提交
1735
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1736

W
wizardforcel 已提交
1737
启动模板化云DataFlow批处理作业。操作的参数将传递给作业。在dag的default_args中定义dataflow_ *参数是一个很好的做法,例如项目,区域和分段位置。
W
zh raw  
wizardforcel 已提交
1738 1739 1740

也可以看看

W
wizardforcel 已提交
1741
[https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters ](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters)[https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment)
W
zh raw  
wizardforcel 已提交
1742

W
wizardforcel 已提交
1743
```py
W
zh raw  
wizardforcel 已提交
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754
 default_args = {
    'dataflow_default_options' : {
        'project' : 'my-gcp-project'
        'zone' : 'europe-west1-d' ,
        'tempLocation' : 'gs://my-staging-bucket/staging/'
        }
    }
}

```

W
wizardforcel 已提交
1755
您需要将路径作为带`template`参数的文件引用传递给数据流模板。使用`parameters`来传递参数给你的工作。使用`environment`对运行环境变量传递给你的工作。
W
zh raw  
wizardforcel 已提交
1756

W
wizardforcel 已提交
1757
```py
W
zh raw  
wizardforcel 已提交
1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
 t1 = DataflowTemplateOperator (
    task_id = 'datapflow_example' ,
    template = '{{var.value.gcp_dataflow_base}}' ,
    parameters = {
        'inputFile' : "gs://bucket/input/my_input.txt" ,
        'outputFile' : "gs://bucket/output/my_output.txt"
    },
    gcp_conn_id = 'gcp-airflow-service-account' ,
    dag = my - dag )

```

W
wizardforcel 已提交
1770
`template``dataflow_default_options`并且`parameters`是模板化的,因此您可以在其中使用变量。
W
zh raw  
wizardforcel 已提交
1771

W
wizardforcel 已提交
1772
##### DataFlowPythonOperator
W
zh raw  
wizardforcel 已提交
1773

W
wizardforcel 已提交
1774
```py
W
wizardforcel 已提交
1775
class airflow.contrib.operators.dataflow_operator.DataFlowPythonOperatorpy_filepy_options = Nonedataflow_default_options = Noneoptions = Nonegcp_conn_id ='google_cloud_default'delegate_to = Nonepoll_sleep = 10* args** kwargs 
W
zh raw  
wizardforcel 已提交
1776 1777
```

W
wizardforcel 已提交
1778
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1779

W
wizardforcel 已提交
1780
```py
W
wizardforcel 已提交
1781
执行上下文 
W
zh raw  
wizardforcel 已提交
1782 1783
```

W
wizardforcel 已提交
1784
执行python数据流作业。
W
zh raw  
wizardforcel 已提交
1785

W
wizardforcel 已提交
1786
#### DataFlowHook
W
zh raw  
wizardforcel 已提交
1787

W
wizardforcel 已提交
1788
```py
W
wizardforcel 已提交
1789
class airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHookgcp_conn_id ='google_cloud_default'delegate_to = Nonepoll_sleep = 10 
W
zh raw  
wizardforcel 已提交
1790 1791
```

W
wizardforcel 已提交
1792
基类: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`](code.html "airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook")
W
zh raw  
wizardforcel 已提交
1793

W
wizardforcel 已提交
1794
```py
W
wizardforcel 已提交
1795
get_conn() 
W
zh raw  
wizardforcel 已提交
1796 1797
```

W
wizardforcel 已提交
1798
返回Google云端存储服务对象。
W
zh raw  
wizardforcel 已提交
1799

W
wizardforcel 已提交
1800
### Cloud DataProc
W
zh raw  
wizardforcel 已提交
1801

W
wizardforcel 已提交
1802
#### DataProc运算符
W
zh raw  
wizardforcel 已提交
1803

W
wizardforcel 已提交
1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814
*   [DataprocClusterCreateOperator](28):在Google Cloud Dataproc上创建新群集。
*   [DataprocClusterDeleteOperator](28):删除Google Cloud Dataproc上的群集。
*   [DataprocClusterScaleOperator](28):在Google Cloud Dataproc上向上或向下扩展群集。
*   [DataProcPigOperator](28):在Cloud DataProc集群上启动Pig查询作业。
*   [DataProcHiveOperator](28):在Cloud DataProc群集上启动Hive查询作业。
*   [DataProcSparkSqlOperator](28):在Cloud DataProc集群上启动Spark SQL查询作业。
*   [DataProcSparkOperator](28):在Cloud DataProc集群上启动Spark作业。
*   [DataProcHadoopOperator](28):在Cloud DataProc集群上启动Hadoop作业。
*   [DataProcPySparkOperator](28):在Cloud DataProc群集上启动PySpark作业。
*   [DataprocWorkflowTemplateInstantiateOperator](28):在Google Cloud Dataproc上实例化WorkflowTemplate。
*   [DataprocWorkflowTemplateInstantiateInlineOperator](28):在Google Cloud Dataproc上实例化WorkflowTemplate内联。
W
zh raw  
wizardforcel 已提交
1815

W
wizardforcel 已提交
1816
##### DataprocClusterCreateOperator
W
zh raw  
wizardforcel 已提交
1817

W
wizardforcel 已提交
1818
```py
W
wizardforcel 已提交
1819
class airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperatorcluster_nameproject_idnum_workerszonenetwork_uri = Nonesubnetwork_uri = Noneinternal_ip_only = Nonetags = Nonestorage_bucket = Noneinit_actions_uris = Noneinit_action_timeout ='10m'metadata =image_version =属性=master_machine_type ='n1-standard-4'master_disk_size = 500worker_machine_type ='n1-standard-4'worker_disk_size = 500num_preemptible_workers = 0labels = Noneregion =' global'gcp_conn_id ='google_cloud_default'delegate_to = Noneservice_account = Noneservice_account_scopes = Noneidle_delete_ttl = Noneauto_delete_time = Noneauto_delete_ttl = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
1820 1821
```

W
wizardforcel 已提交
1822
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1823

W
wizardforcel 已提交
1824
在Google Cloud Dataproc上创建新群集。操作员将等待创建成功或创建过程中发生错误。
W
zh raw  
wizardforcel 已提交
1825

W
wizardforcel 已提交
1826
参数允许配置群集。请参阅
W
zh raw  
wizardforcel 已提交
1827

W
wizardforcel 已提交
1828
[https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters)
W
zh raw  
wizardforcel 已提交
1829

W
wizardforcel 已提交
1830
有关不同参数的详细说明。链接中详述的大多数配置参数都可作为此运算符的参数。
W
zh raw  
wizardforcel 已提交
1831

W
wizardforcel 已提交
1832
参数:
W
zh raw  
wizardforcel 已提交
1833

W
wizardforcel 已提交
1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853
*   `cluster_name(str)` - 要创建的DataProc集群的名称。(模板)
*   `project_id(str)` - 用于创建集群的Google云项目的ID。(模板)
*   `num_workers(int)` - 旋转的工人数量
*   `storage_bucket(str)` - 要使用的存储桶,设置为None允许dataproc为您生成自定义存储桶
*   `init_actions_uris(list[str])` - 包含数据空间初始化脚本的GCS uri列表
*   `init_action_timeout(str)` - init_actions_uris中可执行脚本必须完成的时间
*   `元数据(dict)` - 要添加到所有实例的键值google计算引擎元数据条目的字典
*   `image_version(str)` - Dataproc集群内的软件版本
*   `属性(dict)` -性能上的配置文件设置的字典(如火花defaults.conf),见[https://cloud.google.com/dataproc/docs/reference/rest/v1/](https://cloud.google.com/dataproc/docs/reference/rest/v1/) projects.regions.clusters#SoftwareConfig
*   `master_machine_type(str)` - 计算要用于主节点的引擎机器类型
*   `master_disk_size(int)` - 主节点的磁盘大小
*   `worker_machine_type(str)` - 计算要用于工作节点的引擎计算机类型
*   `worker_disk_size(int)` - 工作节点的磁盘大小
*   `num_preemptible_workers(int)` - 要旋转的可抢占工作节点数
*   `labels(dict)` - 要添加到集群的标签的字典
*   `zone(str)` - 群集所在的区域。(模板)
*   `network_uri(str)` - 用于机器通信的网络uri,不能用subnetwork_uri指定
*   `subnetwork_uri(str)` - 无法使用network_uri指定要用于机器通信的子网uri
*   `internal_ip_only(bool)` - 如果为true,则群集中的所有实例将只具有内部IP地址。这只能为启用子网的网络启用
*   `tags(list[str])` - 要添加到所有实例的GCE标记
W
wizardforcel 已提交
1854
*   **地区** - 作为'全球'留下,可能在未来变得相关。(模板)
W
wizardforcel 已提交
1855 1856 1857 1858 1859 1860 1861
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `service_account(str)` - dataproc实例的服务帐户。
*   `service_account_scopes(list[str])` - 要包含的服务帐户范围的URI。
*   `idle_delete_ttl(int)` - 群集在保持空闲状态时保持活动状态的最长持续时间。通过此阈值将导致群集被自动删除。持续时间(秒)。
*   `auto_delete_time(datetime.datetime)` - 自动删除群集的时间。
*   `auto_delete_ttl(int)` - 群集的生命周期,群集将在此持续时间结束时自动删除。持续时间(秒)。(如果设置了auto_delete_time,则将忽略此参数)
W
zh raw  
wizardforcel 已提交
1862 1863


W
wizardforcel 已提交
1864
##### DataprocClusterScaleOperator
W
zh raw  
wizardforcel 已提交
1865

W
wizardforcel 已提交
1866
```py
W
wizardforcel 已提交
1867
class airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperatorcluster_nameproject_idregion ='global'gcp_conn_id ='google_cloud_default'delegate_to = Nonenum_workers = 2num_preemptible_workers = 0graceful_decommission_timeout = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
1868 1869
```

W
wizardforcel 已提交
1870
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1871

W
wizardforcel 已提交
1872
在Google Cloud Dataproc上进行扩展,向上或向下扩展。操作员将等待,直到重新调整群集。
W
zh raw  
wizardforcel 已提交
1873 1874 1875

**示例**

W
wizardforcel 已提交
1876
```py
W
wizardforcel 已提交
1877
 t1 = DataprocClusterScaleOperator 
W
zh raw  
wizardforcel 已提交
1878 1879
```

W
wizardforcel 已提交
1880
task_id ='dataproc_scale',project_id ='my-project',cluster_name ='cluster-1',num_workers = 10,num_preemptible_workers = 10,graceful_decommission_timeout ='1h'dag = dag)
W
zh raw  
wizardforcel 已提交
1881 1882 1883

也可以看看

W
wizardforcel 已提交
1884
有关扩展群集的更多详细信息,请参阅以下参考:[https](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters)[//cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters)
W
zh raw  
wizardforcel 已提交
1885

W
wizardforcel 已提交
1886
参数:
W
zh raw  
wizardforcel 已提交
1887

W
wizardforcel 已提交
1888 1889 1890 1891 1892 1893 1894 1895
*   `cluster_name(str)` - 要扩展的集群的名称。(模板)
*   `project_id(str)` - 群集运行的Google云项目的ID。(模板)
*   `region(str)` - 数据通路簇的区域。(模板)
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `num_workers(int)` - 新的工人数量
*   `num_preemptible_workers(int)` - 新的可抢占工人数量
*   `graceful_decommission_timeout(str)` - 优雅的YARN decomissioning超时。最大值为1d
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
1896 1897


W
wizardforcel 已提交
1898
##### DataprocClusterDeleteOperator
W
zh raw  
wizardforcel 已提交
1899

W
wizardforcel 已提交
1900
```py
W
wizardforcel 已提交
1901
class airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperatorcluster_nameproject_idregion ='global'gcp_conn_id ='google_cloud_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
1902 1903
```

W
wizardforcel 已提交
1904
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1905

W
wizardforcel 已提交
1906
删除Google Cloud Dataproc上的群集。操作员将等待,直到群集被销毁。
W
zh raw  
wizardforcel 已提交
1907

W
wizardforcel 已提交
1908
参数:
W
zh raw  
wizardforcel 已提交
1909

W
wizardforcel 已提交
1910 1911 1912 1913 1914
*   `cluster_name(str)` - 要创建的集群的名称。(模板)
*   `project_id(str)` - 群集运行的Google云项目的ID。(模板)
*   `region(str)` - 保留为“全局”,将来可能会变得相关。(模板)
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
1915 1916


W
wizardforcel 已提交
1917
##### DataProcPigOperator
W
zh raw  
wizardforcel 已提交
1918

W
wizardforcel 已提交
1919
```py
W
wizardforcel 已提交
1920
class airflow.contrib.operators.dataproc_operator.DataProcPigOperatorquery = Nonequery_uri = Nonevariables = Nonejob_name ='{{task.task_id}} _ {{ds_nodash}}'cluster_name ='cluster-1'dataproc_pig_properties =dataproc_pig_jars =gcp_conn_id ='google_cloud_default'delegate_to =region ='全局'* args** kwargs 
W
zh raw  
wizardforcel 已提交
1921 1922
```

W
wizardforcel 已提交
1923
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1924

W
wizardforcel 已提交
1925
在Cloud DataProc群集上启动Pig查询作业。操作的参数将传递给集群。
W
zh raw  
wizardforcel 已提交
1926

W
wizardforcel 已提交
1927
在dag的default_args中定义dataproc_ *参数是一种很好的做法,比如集群名称和UDF。
W
zh raw  
wizardforcel 已提交
1928

W
wizardforcel 已提交
1929
```py
W
zh raw  
wizardforcel 已提交
1930 1931 1932 1933 1934 1935 1936 1937 1938 1939
 default_args = {
    'cluster_name' : 'cluster-1' ,
    'dataproc_pig_jars' : [
        'gs://example/udf/jar/datafu/1.2.0/datafu.jar' ,
        'gs://example/udf/jar/gpig/1.2/gpig.jar'
    ]
}

```

W
wizardforcel 已提交
1940
您可以将pig脚本作为字符串或文件引用传递。使用变量传递要在群集上解析的pig脚本的变量,或者使用要在脚本中解析的参数作为模板参数。
W
zh raw  
wizardforcel 已提交
1941 1942 1943

**示例**

W
wizardforcel 已提交
1944
```py
W
zh raw  
wizardforcel 已提交
1945 1946 1947 1948 1949 1950 1951 1952 1953 1954
 t1 = DataProcPigOperator (
        task_id = 'dataproc_pig' ,
        query = 'a_pig_script.pig' ,
        variables = { 'out' : 'gs://example/output/{{ds}}' },
        dag = dag )

```

也可以看看

W
wizardforcel 已提交
1955
有关工作提交的更多详细信息,请参阅以下参考:[https](https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs)[//cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs](https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs)
W
zh raw  
wizardforcel 已提交
1956

W
wizardforcel 已提交
1957
参数:
W
zh raw  
wizardforcel 已提交
1958

W
wizardforcel 已提交
1959 1960 1961 1962 1963 1964 1965 1966 1967 1968
*   `query(str)` - 对查询文件的查询或引用(pg或pig扩展)。(模板)
*   `query_uri(str)` - 云存储上的猪脚本的uri。
*   `variables(dict)` - 查询的命名参数的映射。(模板)
*   `job_name(str)` - DataProc集群中使用的作业名称。默认情况下,此名称是附加执行数据的task_id,但可以进行模板化。该名称将始终附加一个随机数,以避免名称冲突。(模板)
*   `cluster_name(str)` - DataProc集群的名称。(模板)
*   `dataproc_pig_properties(dict)` - Pig属性的映射。非常适合放入默认参数
*   `dataproc_pig_jars(list)` - 在云存储中配置的jars的URI(例如:用于UDF和lib),非常适合放入默认参数。
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `region(str)` - 创建数据加载集群的指定区域。
W
zh raw  
wizardforcel 已提交
1969 1970


W
wizardforcel 已提交
1971
##### DataProcHiveOperator
W
zh raw  
wizardforcel 已提交
1972

W
wizardforcel 已提交
1973
```py
W
wizardforcel 已提交
1974
class airflow.contrib.operators.dataproc_operator.DataProcHiveOperatorquery = Nonequery_uri = Nonevariables = Nonejob_name ='{{task.task_id}} _ {{ds_nodash}}'cluster_name ='cluster-1'dataproc_hive_properties =dataproc_hive_jars =gcp_conn_id ='google_cloud_default'delegate_to =region ='全局'* args** kwargs 
W
zh raw  
wizardforcel 已提交
1975 1976
```

W
wizardforcel 已提交
1977
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
1978

W
wizardforcel 已提交
1979
在Cloud DataProc群集上启动Hive查询作业。
W
zh raw  
wizardforcel 已提交
1980

W
wizardforcel 已提交
1981
参数:
W
zh raw  
wizardforcel 已提交
1982

W
wizardforcel 已提交
1983 1984 1985 1986 1987 1988 1989 1990 1991 1992
*   `query(str)` - 查询或对查询文件的引用(q扩展名)。
*   `query_uri(str)` - 云存储上的hive脚本的uri。
*   `variables(dict)` - 查询的命名参数的映射。
*   `job_name(str)` - DataProc集群中使用的作业名称。默认情况下,此名称是附加执行数据的task_id,但可以进行模板化。该名称将始终附加一个随机数,以避免名称冲突。
*   `cluster_name(str)` - DataProc集群的名称。
*   `dataproc_hive_properties(dict)` - Pig属性的映射。非常适合放入默认参数
*   `dataproc_hive_jars(list)` - 在云存储中配置的jars的URI(例如:用于UDF和lib),非常适合放入默认参数。
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `region(str)` - 创建数据加载集群的指定区域。
W
zh raw  
wizardforcel 已提交
1993 1994


W
wizardforcel 已提交
1995
##### DataProcSparkSqlOperator
W
zh raw  
wizardforcel 已提交
1996

W
wizardforcel 已提交
1997
```py
W
wizardforcel 已提交
1998
class airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperatorquery = Nonequery_uri = Nonevariables = Nonejob_name ='{{task.task_id}} _ {{ds_nodash}}'cluster_name ='cluster-1'dataproc_spark_properties =dataproc_spark_jars =gcp_conn_id ='google_cloud_default'delegate_to =region ='全局'* args** kwargs 
W
zh raw  
wizardforcel 已提交
1999 2000
```

W
wizardforcel 已提交
2001
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2002

W
wizardforcel 已提交
2003
在Cloud DataProc集群上启动Spark SQL查询作业。
W
zh raw  
wizardforcel 已提交
2004

W
wizardforcel 已提交
2005
参数:
W
zh raw  
wizardforcel 已提交
2006

W
wizardforcel 已提交
2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
*   `query(str)` - 查询或对查询文件的引用(q扩展名)。(模板)
*   `query_uri(str)` - 云存储上的一个spark sql脚本的uri。
*   `variables(dict)` - 查询的命名参数的映射。(模板)
*   `job_name(str)` - DataProc集群中使用的作业名称。默认情况下,此名称是附加执行数据的task_id,但可以进行模板化。该名称将始终附加一个随机数,以避免名称冲突。(模板)
*   `cluster_name(str)` - DataProc集群的名称。(模板)
*   `dataproc_spark_properties(dict)` - Pig属性的映射。非常适合放入默认参数
*   `dataproc_spark_jars(list)` - 在云存储中配置的jars的URI(例如:用于UDF和lib),非常适合放入默认参数。
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `region(str)` - 创建数据加载集群的指定区域。
W
zh raw  
wizardforcel 已提交
2017 2018


W
wizardforcel 已提交
2019
##### DataProcSparkOperator
W
zh raw  
wizardforcel 已提交
2020

W
wizardforcel 已提交
2021
```py
W
wizardforcel 已提交
2022
class airflow.contrib.operators.dataproc_operator.DataProcSparkOperatormain_jar = Nonemain_class = Nonearguments = Nonearchives = Nonefiles = Nonejob_name ='{{task.task_id}} _ {{ds_nodash}}'cluster_name ='cluster-1'dataproc_spark_properties =dataproc_spark_jars =gcp_conn_id ='google_cloud_default'delegate_to =region ='全局'* args** kwargs 
W
zh raw  
wizardforcel 已提交
2023 2024
```

W
wizardforcel 已提交
2025
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2026

W
wizardforcel 已提交
2027
在Cloud DataProc群集上启动Spark作业。
W
zh raw  
wizardforcel 已提交
2028

W
wizardforcel 已提交
2029
参数:
W
zh raw  
wizardforcel 已提交
2030

W
wizardforcel 已提交
2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042
*   `main_jar(str)` - 在云存储上配置的作业jar的URI。(使用this或main_class,而不是两者一起)。
*   `main_class(str)` - 作业类的名称。(使用this或main_jar,而不是两者一起)。
*   `arguments(list)` - 作业的参数。(模板)
*   `archives(list)` - 将在工作目录中解压缩的已归档文件列表。应存储在云存储中。
*   `files(list)` - 要复制到工作目录的文件列表
*   `job_name(str)` - DataProc集群中使用的作业名称。默认情况下,此名称是附加执行数据的task_id,但可以进行模板化。该名称将始终附加一个随机数,以避免名称冲突。(模板)
*   `cluster_name(str)` - DataProc集群的名称。(模板)
*   `dataproc_spark_properties(dict)` - Pig属性的映射。非常适合放入默认参数
*   `dataproc_spark_jars(list)` - 在云存储中配置的jars的URI(例如:用于UDF和lib),非常适合放入默认参数。
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `region(str)` - 创建数据加载集群的指定区域。
W
zh raw  
wizardforcel 已提交
2043 2044


W
wizardforcel 已提交
2045
##### DataProcHadoopOperator
W
zh raw  
wizardforcel 已提交
2046

W
wizardforcel 已提交
2047
```py
W
wizardforcel 已提交
2048
class airflow.contrib.operators.dataproc_operator.DataProcHadoopOperatormain_jar = Nonemain_class = Nonearguments = Nonearchives = Nonefiles = Nonejob_name ='{{task.task_id}} _ {{ds_nodash}}'cluster_name ='cluster-1'dataproc_hadoop_properties =dataproc_hadoop_jars =gcp_conn_id ='google_cloud_default'delegate_to =region ='全局'* args** kwargs 
W
zh raw  
wizardforcel 已提交
2049 2050
```

W
wizardforcel 已提交
2051
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2052

W
wizardforcel 已提交
2053
在Cloud DataProc群集上启动Hadoop作业。
W
zh raw  
wizardforcel 已提交
2054

W
wizardforcel 已提交
2055
参数:
W
zh raw  
wizardforcel 已提交
2056

W
wizardforcel 已提交
2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068
*   `main_jar(str)` - 在云存储上配置的作业jar的URI。(使用this或main_class,而不是两者一起)。
*   `main_class(str)` - 作业类的名称。(使用this或main_jar,而不是两者一起)。
*   `arguments(list)` - 作业的参数。(模板)
*   `archives(list)` - 将在工作目录中解压缩的已归档文件列表。应存储在云存储中。
*   `files(list)` - 要复制到工作目录的文件列表
*   `job_name(str)` - DataProc集群中使用的作业名称。默认情况下,此名称是附加执行数据的task_id,但可以进行模板化。该名称将始终附加一个随机数,以避免名称冲突。(模板)
*   `cluster_name(str)` - DataProc集群的名称。(模板)
*   `dataproc_hadoop_properties(dict)` - Pig属性的映射。非常适合放入默认参数
*   `dataproc_hadoop_jars(list)` - 在云存储中配置的jars的URI(例如:用于UDF和lib),非常适合放入默认参数。
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `region(str)` - 创建数据加载集群的指定区域。
W
zh raw  
wizardforcel 已提交
2069 2070


W
wizardforcel 已提交
2071
##### DataProcPySparkOperator
W
zh raw  
wizardforcel 已提交
2072

W
wizardforcel 已提交
2073
```py
W
wizardforcel 已提交
2074
class airflow.contrib.operators.dataproc_operator.DataProcPySparkOperatormainarguments = Nonearchives = Nonepyfiles = Nonefiles = Nonejob_name ='{{task.task_id}} _ {{ds_nodash}}'cluster_name =' cluster-1'dataproc_pyspark_properties = Nonedataproc_pyspark_jars = Nonegcp_conn_id ='google_cloud_default'delegate_to = Noneregion ='global'* args** kwargs 
W
zh raw  
wizardforcel 已提交
2075 2076
```

W
wizardforcel 已提交
2077
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2078

W
wizardforcel 已提交
2079
在Cloud DataProc群集上启动PySpark作业。
W
zh raw  
wizardforcel 已提交
2080

W
wizardforcel 已提交
2081
参数:
W
zh raw  
wizardforcel 已提交
2082

W
wizardforcel 已提交
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094
*   `main(str)` - [必需]用作驱动程序的主Python文件的Hadoop兼容文件系统(HCFS)URI。必须是.py文件。
*   `arguments(list)` - 作业的参数。(模板)
*   `archives(list)` - 将在工作目录中解压缩的已归档文件列表。应存储在云存储中。
*   `files(list)` - 要复制到工作目录的文件列表
*   `pyfiles(list)` - 要传递给PySpark框架的Python文件列表。支持的文件类型:.py,.egg和.zip
*   `job_name(str)` - DataProc集群中使用的作业名称。默认情况下,此名称是附加执行数据的task_id,但可以进行模板化。该名称将始终附加一个随机数,以避免名称冲突。(模板)
*   `cluster_name(str)` - DataProc集群的名称。
*   `dataproc_pyspark_properties(dict)` - Pig属性的映射。非常适合放入默认参数
*   `dataproc_pyspark_jars(list)` - 在云存储中配置的jars的URI(例如:用于UDF和lib),非常适合放入默认参数。
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `region(str)` - 创建数据加载集群的指定区域。
W
zh raw  
wizardforcel 已提交
2095 2096


W
wizardforcel 已提交
2097
##### DataprocWorkflowTemplateInstantiateOperator
W
zh raw  
wizardforcel 已提交
2098

W
wizardforcel 已提交
2099
```py
W
wizardforcel 已提交
2100
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateOperatortemplate_id* args** kwargs 
W
zh raw  
wizardforcel 已提交
2101 2102
```

W
wizardforcel 已提交
2103
基类: [`airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator`](code.html "airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator")
W
zh raw  
wizardforcel 已提交
2104

W
wizardforcel 已提交
2105
在Google Cloud Dataproc上实例化WorkflowTemplate。操作员将等待WorkflowTemplate完成执行。
W
zh raw  
wizardforcel 已提交
2106 2107 2108

也可以看看

W
wizardforcel 已提交
2109
请参阅:[https](https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate)[//cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate](https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate)
W
zh raw  
wizardforcel 已提交
2110

W
wizardforcel 已提交
2111
参数:
W
zh raw  
wizardforcel 已提交
2112

W
wizardforcel 已提交
2113 2114 2115 2116 2117
*   `template_id(str)` - 模板的id。(模板)
*   `project_id(str)` - 模板运行所在的Google云项目的ID
*   `region(str)` - 保留为“全局”,将来可能会变得相关
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2118 2119


W
wizardforcel 已提交
2120
##### DataprocWorkflowTemplateInstantiateInlineOperator
W
zh raw  
wizardforcel 已提交
2121

W
wizardforcel 已提交
2122
```py
W
wizardforcel 已提交
2123
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateInlineOperatortemplate* args** kwargs 
W
zh raw  
wizardforcel 已提交
2124 2125
```

W
wizardforcel 已提交
2126
基类: [`airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator`](code.html "airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator")
W
zh raw  
wizardforcel 已提交
2127

W
wizardforcel 已提交
2128
在Google Cloud Dataproc上实例化WorkflowTemplate内联。操作员将等待WorkflowTemplate完成执行。
W
zh raw  
wizardforcel 已提交
2129 2130 2131

也可以看看

W
wizardforcel 已提交
2132
请参阅:[https](https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline)[//cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline](https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline)
W
zh raw  
wizardforcel 已提交
2133

W
wizardforcel 已提交
2134
参数:
W
zh raw  
wizardforcel 已提交
2135

W
wizardforcel 已提交
2136 2137 2138 2139 2140
*   `template(map)` - 模板内容。(模板)
*   `project_id(str)` - 模板运行所在的Google云项目的ID
*   `region(str)` - 保留为“全局”,将来可能会变得相关
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2141 2142


W
wizardforcel 已提交
2143
### 云数据存储区
W
zh raw  
wizardforcel 已提交
2144

W
wizardforcel 已提交
2145
#### 数据存储区运营商
W
zh raw  
wizardforcel 已提交
2146

W
wizardforcel 已提交
2147 2148
*   [DatastoreExportOperator](28):将实体从Google Cloud Datastore导出到云存储。
*   [DatastoreImportOperator](28):将实体从云存储导入Google Cloud Datastore。
W
zh raw  
wizardforcel 已提交
2149

W
wizardforcel 已提交
2150
##### DatastoreExportOperator
W
zh raw  
wizardforcel 已提交
2151

W
wizardforcel 已提交
2152
```py
W
wizardforcel 已提交
2153
class airflow.contrib.operators.datastore_export_operator.DatastoreExportOperatorbucketnamespace = Nonedatastore_conn_id ='google_cloud_default'cloud_storage_conn_id ='google_cloud_default'delegate_to = Noneentity_filter = Nonelabels = Nonepolling_interval_in_seconds = 10overwrite_existing = Falsexcom_push =* args** kwargs 
W
zh raw  
wizardforcel 已提交
2154 2155
```

W
wizardforcel 已提交
2156
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2157

W
wizardforcel 已提交
2158
将实体从Google Cloud Datastore导出到云存储
W
zh raw  
wizardforcel 已提交
2159

W
wizardforcel 已提交
2160
参数:
W
zh raw  
wizardforcel 已提交
2161

W
wizardforcel 已提交
2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
*   `bucket(str)` - 要备份数据的云存储桶的名称
*   `namespace(str)` - 指定云存储桶中用于备份数据的可选命名空间路径。如果GCS中不存在此命名空间,则将创建该命名空间。
*   `datastore_conn_id(str)` - 要使用的数据存储区连接ID的名称
*   `cloud_storage_conn_id(str)` - 强制写入备份的云存储连接ID的名称
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `entity_filter(dict)` - 导出中包含项目中哪些数据的说明,请参阅[https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter](https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter)
*   `labels(dict)` - 客户端分配的云存储标签
*   `polling_interval_in_seconds(int)` - 再次轮询执行状态之前等待的秒数
*   `overwrite_existing(bool)` - 如果存储桶+命名空间不为空,则在导出之前将清空它。这样可以覆盖现有备份。
*   `xcom_push(bool)` - 将操作名称推送到xcom以供参考
W
zh raw  
wizardforcel 已提交
2172 2173


W
wizardforcel 已提交
2174
##### DatastoreImportOperator
W
zh raw  
wizardforcel 已提交
2175

W
wizardforcel 已提交
2176
```py
W
wizardforcel 已提交
2177
class airflow.contrib.operators.datastore_import_operator.DatastoreImportOperatorbucketfilenamespace = Noneentity_filter = Nonelabels = Nonedatastore_conn_id ='google_cloud_default'delegate_to = Nonepolling_interval_in_seconds = 10xcom_push = False* args** kwargs 
W
zh raw  
wizardforcel 已提交
2178 2179
```

W
wizardforcel 已提交
2180
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2181

W
wizardforcel 已提交
2182
将实体从云存储导入Google Cloud Datastore
W
zh raw  
wizardforcel 已提交
2183

W
wizardforcel 已提交
2184
参数:
W
zh raw  
wizardforcel 已提交
2185

W
wizardforcel 已提交
2186 2187 2188 2189 2190 2191 2192 2193 2194
*   `bucket(str)` - 云存储中用于存储数据的容器
*   `file(str)` - 指定云存储桶中备份元数据文件的路径。它应该具有扩展名.overall_export_metadata
*   `namespace(str)` - 指定云存储桶中备份元数据文件的可选命名空间。
*   `entity_filter(dict)` - 导出中包含项目中哪些数据的说明,请参阅[https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter](https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter)
*   `labels(dict)` - 客户端分配的云存储标签
*   `datastore_conn_id(str)` - 要使用的连接ID的名称
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `polling_interval_in_seconds(int)` - 再次轮询执行状态之前等待的秒数
*   `xcom_push(bool)` - 将操作名称推送到xcom以供参考
W
zh raw  
wizardforcel 已提交
2195 2196


W
wizardforcel 已提交
2197
#### DatastoreHook
W
zh raw  
wizardforcel 已提交
2198

W
wizardforcel 已提交
2199
```py
W
wizardforcel 已提交
2200
class airflow.contrib.hooks.datastore_hook.DatastoreHookdatastore_conn_id ='google_cloud_datastore_default'delegate_to = None 
W
zh raw  
wizardforcel 已提交
2201 2202
```

W
wizardforcel 已提交
2203
基类: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`](code.html "airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook")
W
zh raw  
wizardforcel 已提交
2204

W
wizardforcel 已提交
2205
与Google Cloud Datastore互动。此挂钩使用Google Cloud Platform连接。
W
zh raw  
wizardforcel 已提交
2206

W
wizardforcel 已提交
2207
此对象不是线程安全的。如果要同时发出多个请求,则需要为每个线程创建一个钩子。
W
zh raw  
wizardforcel 已提交
2208

W
wizardforcel 已提交
2209
```py
W
wizardforcel 已提交
2210
allocate_idspartialKeys 
W
zh raw  
wizardforcel 已提交
2211 2212
```

W
wizardforcel 已提交
2213
为不完整的密钥分配ID。请参阅[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds)
W
zh raw  
wizardforcel 已提交
2214

W
wizardforcel 已提交
2215 2216 2217
参数:**partialKeys** - 部分键列表 

返回:完整密钥列表。
W
zh raw  
wizardforcel 已提交
2218

W
wizardforcel 已提交
2219
```py
W
wizardforcel 已提交
2220
begin_transaction() 
W
zh raw  
wizardforcel 已提交
2221 2222
```

W
wizardforcel 已提交
2223
获取新的事务处理
W
zh raw  
wizardforcel 已提交
2224 2225 2226

> 也可以看看
> 
W
wizardforcel 已提交
2227
> [https://cloud.google.com/datastore/docs/reference/rest/v1/projects/beginTransaction](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/beginTransaction)
W
zh raw  
wizardforcel 已提交
2228

W
wizardforcel 已提交
2229
返回:交易句柄
W
zh raw  
wizardforcel 已提交
2230

W
wizardforcel 已提交
2231
```py
W
wizardforcel 已提交
2232
提交 
W
zh raw  
wizardforcel 已提交
2233 2234
```

W
wizardforcel 已提交
2235
提交事务,可选地创建,删除或修改某些实体。
W
zh raw  
wizardforcel 已提交
2236 2237 2238

也可以看看

W
wizardforcel 已提交
2239 2240 2241
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/commit](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/commit)

参数:**body** - 提交请求的主体 
W
zh raw  
wizardforcel 已提交
2242

W
wizardforcel 已提交
2243
返回:提交请求的响应主体
W
zh raw  
wizardforcel 已提交
2244

W
wizardforcel 已提交
2245
```py
W
wizardforcel 已提交
2246
delete_operation名称 
W
zh raw  
wizardforcel 已提交
2247 2248
```

W
wizardforcel 已提交
2249 2250 2251
删除长时间运行的操作

参数:**name** - 操作资源的名称 
W
zh raw  
wizardforcel 已提交
2252 2253


W
wizardforcel 已提交
2254
```py
W
wizardforcel 已提交
2255
export_to_storage_bucketbucketnamespace = Noneentity_filter = Nonelabels = None 
W
zh raw  
wizardforcel 已提交
2256 2257
```

W
wizardforcel 已提交
2258
将实体从Cloud Datastore导出到Cloud Storage进行备份
W
zh raw  
wizardforcel 已提交
2259

W
wizardforcel 已提交
2260
```py
W
wizardforcel 已提交
2261
get_conn版本= 'V1' 
W
zh raw  
wizardforcel 已提交
2262 2263
```

W
wizardforcel 已提交
2264
返回Google云端存储服务对象。
W
zh raw  
wizardforcel 已提交
2265

W
wizardforcel 已提交
2266
```py
W
wizardforcel 已提交
2267
GET_OPERATION名称 
W
zh raw  
wizardforcel 已提交
2268 2269
```

W
wizardforcel 已提交
2270 2271 2272
获取长时间运行的最新状态

参数:**name** - 操作资源的名称 
W
zh raw  
wizardforcel 已提交
2273 2274


W
wizardforcel 已提交
2275
```py
W
wizardforcel 已提交
2276
import_from_storage_bucketbucketfilenamespace = Noneentity_filter = Nonelabels = None 
W
zh raw  
wizardforcel 已提交
2277 2278
```

W
wizardforcel 已提交
2279
将备份从云存储导入云数据存储
W
zh raw  
wizardforcel 已提交
2280

W
wizardforcel 已提交
2281
```py
W
wizardforcel 已提交
2282
lookupkeysread_consistency = Nonetransaction = None 
W
zh raw  
wizardforcel 已提交
2283 2284
```

W
wizardforcel 已提交
2285
按键查找一些实体
W
zh raw  
wizardforcel 已提交
2286 2287 2288

也可以看看

W
wizardforcel 已提交
2289
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/lookup](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/lookup)
W
zh raw  
wizardforcel 已提交
2290

W
wizardforcel 已提交
2291
参数:
W
zh raw  
wizardforcel 已提交
2292

W
wizardforcel 已提交
2293 2294 2295
*   **keys** - 要查找的键
*   **read_consistency** - 要使用的读取一致性。默认,强或最终。不能与事务一起使用。
*   **transaction** - 要使用的事务,如果有的话。
W
zh raw  
wizardforcel 已提交
2296

W
wizardforcel 已提交
2297
返回:查找请求的响应主体。
W
zh raw  
wizardforcel 已提交
2298

W
wizardforcel 已提交
2299
```py
W
wizardforcel 已提交
2300
poll_operation_until_donenamepolling_interval_in_seconds 
W
zh raw  
wizardforcel 已提交
2301 2302
```

W
wizardforcel 已提交
2303
轮询备份操作状态直到完成
W
zh raw  
wizardforcel 已提交
2304

W
wizardforcel 已提交
2305
```py
W
wizardforcel 已提交
2306
回滚事务 
W
zh raw  
wizardforcel 已提交
2307 2308
```

W
wizardforcel 已提交
2309
回滚交易
W
zh raw  
wizardforcel 已提交
2310 2311 2312

也可以看看

W
wizardforcel 已提交
2313 2314 2315
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/rollback](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/rollback)

参数:**transaction** - 要回滚的事务 
W
zh raw  
wizardforcel 已提交
2316 2317


W
wizardforcel 已提交
2318
```py
W
wizardforcel 已提交
2319
run_query 
W
zh raw  
wizardforcel 已提交
2320 2321
```

W
wizardforcel 已提交
2322
运行实体查询。
W
zh raw  
wizardforcel 已提交
2323 2324 2325

也可以看看

W
wizardforcel 已提交
2326 2327 2328
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/runQuery](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/runQuery)

参数:**body** - 查询请求的主体 
W
zh raw  
wizardforcel 已提交
2329

W
wizardforcel 已提交
2330
返回:批量查询结果。
W
zh raw  
wizardforcel 已提交
2331 2332 2333

### 云ML引擎

W
wizardforcel 已提交
2334
#### 云ML引擎运营商
W
zh raw  
wizardforcel 已提交
2335

W
wizardforcel 已提交
2336 2337 2338 2339
*   [MLEngineBatchPredictionOperator](28):启动Cloud ML Engine批量预测作业。
*   [MLEngineModelOperator](28):管理Cloud ML Engine模型。
*   [MLEngineTrainingOperator](28):启动Cloud ML Engine培训工作。
*   [MLEngineVersionOperator](28):管理Cloud ML Engine模型版本。
W
zh raw  
wizardforcel 已提交
2340

W
wizardforcel 已提交
2341
##### MLEngineBatchPredictionOperator
W
zh raw  
wizardforcel 已提交
2342

W
wizardforcel 已提交
2343
```py
W
wizardforcel 已提交
2344
class airflow.contrib.operators.mlengine_operator.MLEngineBatchPredictionOperatorproject_idjob_idregiondata_formatinput_pathsoutput_pathmodel_name = Noneversion_name = Noneuri = Nonemax_worker_count = Noneruntime_version = Nonegcp_conn_id ='google_cloud_default'delegate_to =* args** kwargs 
W
zh raw  
wizardforcel 已提交
2345 2346
```

W
wizardforcel 已提交
2347
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2348

W
wizardforcel 已提交
2349
启动Google Cloud ML Engine预测作业。
W
zh raw  
wizardforcel 已提交
2350

W
wizardforcel 已提交
2351
注意:对于模型原点,用户应该考虑以下三个选项中的一个:1。仅填充“uri”字段,该字段应该是指向tensorflow savedModel目录的GCS位置。2.仅填充'model_name'字段,该字段引用现有模型,并将使用模型的默认版本。3.填充“model_name”和“version_name”字段,这些字段指特定模型的特定版本。
W
zh raw  
wizardforcel 已提交
2352

W
wizardforcel 已提交
2353
在选项2和3中,模型和版本名称都应包含最小标识符。例如,打电话
W
zh raw  
wizardforcel 已提交
2354

W
wizardforcel 已提交
2355
```py
W
zh raw  
wizardforcel 已提交
2356 2357 2358 2359 2360 2361 2362 2363
 MLEngineBatchPredictionOperator (
    ... ,
    model_name = 'my_model' ,
    version_name = 'my_version' ,
    ... )

```

W
wizardforcel 已提交
2364
如果所需的型号版本是“projects / my_project / models / my_model / versions / my_version”。
W
zh raw  
wizardforcel 已提交
2365

W
wizardforcel 已提交
2366
有关参数的更多文档,请参阅[https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs)
W
zh raw  
wizardforcel 已提交
2367

W
wizardforcel 已提交
2368
参数:
W
zh raw  
wizardforcel 已提交
2369

W
wizardforcel 已提交
2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382
*   `project_id(str)` - 提交预测作业的Google Cloud项目名称。(模板)
*   `job_id(str)` - Google Cloud ML Engine上预测作业的唯一ID。(模板)
*   `data_format(str)` - 输入数据的格式。如果未提供或者不是[“TEXT”,“TF_RECORD”,“TF_RECORD_GZIP”]之一,它将默认为“DATA_FORMAT_UNSPECIFIED”。
*   `input_paths(list[str])` - 批量预测的输入数据的GCS路径列表。接受通配符运算符[*](28),但仅限于结尾处。(模板)
*   `output_path(str)` - 写入预测结果的GCS路径。(模板)
*   `region(str)` - 用于运行预测作业的Google Compute Engine区域。(模板化)
*   `model_name(str)` - 用于预测的Google Cloud ML Engine模型。如果未提供version_name,则将使用此模型的默认版本。如果提供了version_name,则不应为None。如果提供uri,则应为None。(模板)
*   `version_name(str)` - 用于预测的Google Cloud ML Engine模型版本。如果提供uri,则应为None。(模板)
*   `uri(str)` - 用于预测的已保存模型的GCS路径。如果提供了model_name,则应为None。它应该是指向张量流SavedModel的GCS路径。(模板)
*   `max_worker_count(int)` - 用于并行处理的最大worker数。如果未指定,则默认为10。
*   `runtime_version(str)` - 用于批量预测的Google Cloud ML Engine运行时版本。
*   `gcp_conn_id(str)` - 用于连接到Google Cloud Platform的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用doamin范围的委派。
W
zh raw  
wizardforcel 已提交
2383 2384


W
wizardforcel 已提交
2385
```py
W
zh raw  
wizardforcel 已提交
2386 2387 2388
 Raises: 
```

W
wizardforcel 已提交
2389
`ValueError` :如果无法确定唯一的模型/版本来源。
W
zh raw  
wizardforcel 已提交
2390

W
wizardforcel 已提交
2391
##### MLEngineModelOperator
W
zh raw  
wizardforcel 已提交
2392

W
wizardforcel 已提交
2393
```py
W
wizardforcel 已提交
2394
class airflow.contrib.operators.mlengine_operator.MLEngineModelOperatorproject_idmodeloperation ='create'gcp_conn_id ='google_cloud_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
2395 2396
```

W
wizardforcel 已提交
2397
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2398

W
wizardforcel 已提交
2399
管理Google Cloud ML Engine模型的运营商。
W
zh raw  
wizardforcel 已提交
2400

W
wizardforcel 已提交
2401
参数:
W
zh raw  
wizardforcel 已提交
2402

W
wizardforcel 已提交
2403 2404
*   `project_id(str)` - MLEngine模型所属的Google Cloud项目名称。(模板)
*   `型号(dict)` -
W
zh raw  
wizardforcel 已提交
2405

W
wizardforcel 已提交
2406
    包含有关模型信息的字典。如果`操作`是`create`,则`model`参数应包含有关此模型的所有信息,例如`name`。
W
zh raw  
wizardforcel 已提交
2407

W
wizardforcel 已提交
2408
    如果`操作`是`get`,则`model`参数应包含`模型`的`名称`。
W
zh raw  
wizardforcel 已提交
2409

W
wizardforcel 已提交
2410
*   **操作** -
W
zh raw  
wizardforcel 已提交
2411

W
wizardforcel 已提交
2412
    执行的操作。可用的操作是:
W
zh raw  
wizardforcel 已提交
2413

W
wizardforcel 已提交
2414 2415
    *   `create`:创建`model`参数提供的新模型。
    *   `get`:获取在模型中指定名称的特定`模型`。
W
wizardforcel 已提交
2416 2417
*   `gcp_conn_id(str)` - 获取连接信息时使用的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2418 2419


W
wizardforcel 已提交
2420
##### MLEngineTrainingOperator
W
zh raw  
wizardforcel 已提交
2421

W
wizardforcel 已提交
2422
```py
W
wizardforcel 已提交
2423
class airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperatorproject_idjob_idpackage_uristraining_python_moduletraining_argsregionscale_tier = Noneruntime_version = Nonepython_version = Nonejob_dir = Nonegcp_conn_id ='google_cloud_default'delegate_to = Nonemode ='生产'* args** kwargs 
W
zh raw  
wizardforcel 已提交
2424 2425
```

W
wizardforcel 已提交
2426
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2427

W
wizardforcel 已提交
2428
启动MLEngine培训工作的操作员。
W
zh raw  
wizardforcel 已提交
2429

W
wizardforcel 已提交
2430
参数:
W
zh raw  
wizardforcel 已提交
2431

W
wizardforcel 已提交
2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444
*   `project_id(str)` - 应在其中运行MLEngine培训作业的Google Cloud项目名称(模板化)。
*   `job_id(str)` - 提交的Google MLEngine培训作业的唯一模板化ID。(模板)
*   `package_uris(str)` - MLEngine培训作业的包位置列表,其中应包括主要培训计划+任何其他依赖项。(模板)
*   `training_python_module(str)` - 安装'package_uris'软件包后,在MLEngine培训作业中运行的Python模块名称。(模板)
*   `training_args(str)` - 传递给MLEngine训练程序的模板化命令行参数列表。(模板)
*   `region(str)` - 用于运行MLEngine培训作业的Google Compute Engine区域(模板化)。
*   `scale_tier(str)` - MLEngine培训作业的资源层。(模板)
*   `runtime_version(str)` - 用于培训的Google Cloud ML运行时版本。(模板)
*   `python_version(str)` - 训练中使用的Python版本。(模板)
*   `job_dir(str)` - 用于存储培训输出和培训所需的其他数据的Google云端存储路径。(模板)
*   `gcp_conn_id(str)` - 获取连接信息时使用的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `mode(str)` - 可以是'DRY_RUN'/'CLOUD'之一。在“DRY_RUN”模式下,不会启动真正的培训作业,但会打印出MLEngine培训作业请求。在“CLOUD”模式下,将发出真正的MLEngine培训作业创建请求。
W
zh raw  
wizardforcel 已提交
2445 2446


W
wizardforcel 已提交
2447
##### MLEngineVersionOperator
W
zh raw  
wizardforcel 已提交
2448

W
wizardforcel 已提交
2449
```py
W
wizardforcel 已提交
2450
class airflow.contrib.operators.mlengine_operator.MLEngineVersionOperatorproject_idmodel_nameversion_name = Noneversion = Noneoperation ='create'gcp_conn_id ='google_cloud_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
2451 2452
```

W
wizardforcel 已提交
2453
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2454

W
wizardforcel 已提交
2455
管理Google Cloud ML Engine版本的运营商。
W
zh raw  
wizardforcel 已提交
2456

W
wizardforcel 已提交
2457
参数:
W
zh raw  
wizardforcel 已提交
2458

W
wizardforcel 已提交
2459 2460 2461 2462 2463
*   `project_id(str)` - MLEngine模型所属的Google Cloud项目名称。
*   `model_name(str)` - 版本所属的Google Cloud ML Engine模型的名称。(模板)
*   `version_name(str)` - 用于正在操作的版本的名称。如果没有人及`版本`的说法是没有或不具备的值`名称`键,那么这将是有效载荷中用于填充`名称`键。(模板)
*   `version(dict)` - 包含版本信息的字典。如果`操作``create`,则`version`应包含有关此版本的所有信息,例如name和deploymentUrl。如果`操作``get``delete`,则`version`参数应包含`版本``名称`。如果是None,则唯一可能的`操作``list`。(模板)
*   `操作(str)` -
W
zh raw  
wizardforcel 已提交
2464

W
wizardforcel 已提交
2465
    执行的操作。可用的操作是:
W
zh raw  
wizardforcel 已提交
2466

W
wizardforcel 已提交
2467 2468 2469 2470
    *   `create`:在`model_name`指定的`模型中`创建新版本,在这种情况下,`version`参数应包含创建该版本的所有信息(例如`name`,`deploymentUrl`)。
    *   `get`:获取`model_name`指定的`模型中`特定版本的完整信息。应在`version`参数中指定版本的名称。
    *   `list`:列出`model_name`指定的`模型的`所有可用版本。
    *   `delete`:从`model_name`指定的`模型中`删除`version`参数中指定的`版本`。应在`version`参数中指定版本的名称。
W
wizardforcel 已提交
2471 2472
*   `gcp_conn_id(str)` - 获取连接信息时使用的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2473 2474


W
wizardforcel 已提交
2475
#### Cloud ML Engine Hook
W
zh raw  
wizardforcel 已提交
2476

W
wizardforcel 已提交
2477
##### MLEngineHook
W
zh raw  
wizardforcel 已提交
2478

W
wizardforcel 已提交
2479
```py
W
wizardforcel 已提交
2480
class airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHookgcp_conn_id ='google_cloud_default'delegate_to = None 
W
zh raw  
wizardforcel 已提交
2481 2482
```

W
wizardforcel 已提交
2483
基类: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`](code.html "airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook")
W
zh raw  
wizardforcel 已提交
2484

W
wizardforcel 已提交
2485
```py
W
wizardforcel 已提交
2486
create_jobproject_idjobuse_existing_job_fn = None 
W
zh raw  
wizardforcel 已提交
2487 2488
```

W
wizardforcel 已提交
2489
启动MLEngine作业并等待它达到终端状态。
W
zh raw  
wizardforcel 已提交
2490

W
wizardforcel 已提交
2491
参数:
W
zh raw  
wizardforcel 已提交
2492

W
wizardforcel 已提交
2493 2494
*   `project_id(str)` - 将在其中启动MLEngine作业的Google Cloud项目ID。
*   `工作(dict)` -
W
zh raw  
wizardforcel 已提交
2495

W
wizardforcel 已提交
2496
    应该提供给MLEngine API的MLEngine Job对象,例如:
W
zh raw  
wizardforcel 已提交
2497

W
wizardforcel 已提交
2498
    ```py
W
zh raw  
wizardforcel 已提交
2499 2500 2501 2502 2503 2504 2505 2506 2507 2508
     {
      'jobId' : 'my_job_id' ,
      'trainingInput' : {
        'scaleTier' : 'STANDARD_1' ,
        ...
      }
    }

    ```

W
wizardforcel 已提交
2509
*   `use_existing_job_fn(function)` - 如果已存在具有相同job_id的MLEngine作业,则此方法(如果提供)将决定是否应使用此现有作业,继续等待它完成并返回作业对象。它应该接受MLEngine作业对象,并返回一个布尔值,指示是否可以重用现有作业。如果未提供“use_existing_job_fn”,我们默认重用现有的MLEngine作业。
W
wizardforcel 已提交
2510 2511

返回:如果作业成功到达终端状态(可能是FAILED或CANCELED状态),则为MLEngine作业对象。
W
zh raw  
wizardforcel 已提交
2512

W
wizardforcel 已提交
2513
返回类型:字典
W
zh raw  
wizardforcel 已提交
2514

W
wizardforcel 已提交
2515
```py
W
wizardforcel 已提交
2516
create_modelproject_idmodel 
W
zh raw  
wizardforcel 已提交
2517 2518
```

W
wizardforcel 已提交
2519
创建一个模型。阻止直到完成。
W
zh raw  
wizardforcel 已提交
2520

W
wizardforcel 已提交
2521
```py
W
wizardforcel 已提交
2522
create_versionproject_idmodel_nameversion_spec 
W
zh raw  
wizardforcel 已提交
2523 2524
```

W
wizardforcel 已提交
2525
在Google Cloud ML Engine上创建版本。
W
zh raw  
wizardforcel 已提交
2526

W
wizardforcel 已提交
2527
如果版本创建成功则返回操作,否则引发错误。
W
zh raw  
wizardforcel 已提交
2528

W
wizardforcel 已提交
2529
```py
W
wizardforcel 已提交
2530
delete_versionproject_idmodel_nameversion_name 
W
zh raw  
wizardforcel 已提交
2531 2532
```

W
wizardforcel 已提交
2533
删除给定版本的模型。阻止直到完成。
W
zh raw  
wizardforcel 已提交
2534

W
wizardforcel 已提交
2535
```py
W
wizardforcel 已提交
2536
get_conn() 
W
zh raw  
wizardforcel 已提交
2537 2538
```

W
wizardforcel 已提交
2539
返回Google MLEngine服务对象。
W
zh raw  
wizardforcel 已提交
2540

W
wizardforcel 已提交
2541
```py
W
wizardforcel 已提交
2542
get_modelproject_idmodel_name 
W
zh raw  
wizardforcel 已提交
2543 2544
```

W
wizardforcel 已提交
2545
获取一个模型。阻止直到完成。
W
zh raw  
wizardforcel 已提交
2546

W
wizardforcel 已提交
2547
```py
W
wizardforcel 已提交
2548
list_versionsproject_idmodel_name 
W
zh raw  
wizardforcel 已提交
2549 2550
```

W
wizardforcel 已提交
2551
列出模型的所有可用版本。阻止直到完成。
W
zh raw  
wizardforcel 已提交
2552

W
wizardforcel 已提交
2553
```py
W
wizardforcel 已提交
2554
set_default_versionproject_idmodel_nameversion_name 
W
zh raw  
wizardforcel 已提交
2555 2556
```

W
wizardforcel 已提交
2557
将版本设置为默认值。阻止直到完成。
W
zh raw  
wizardforcel 已提交
2558 2559 2560

### 云储存

W
wizardforcel 已提交
2561
#### 存储运营商
W
zh raw  
wizardforcel 已提交
2562

W
wizardforcel 已提交
2563 2564 2565 2566 2567 2568
*   [FileToGoogleCloudStorageOperator](28):将文件上传到Google云端存储。
*   [GoogleCloudStorageCreateBucketOperator](28):创建新的云存储桶。
*   [GoogleCloudStorageListOperator](28):列出存储桶中的所有对象,并在名称中添加字符串前缀和分隔符。
*   [GoogleCloudStorageDownloadOperator](28):从Google云端存储中下载文件。
*   [GoogleCloudStorageToBigQueryOperator](28):将Google云存储中的文件加载到BigQuery中。
*   [GoogleCloudStorageToGoogleCloudStorageOperator](28):将对象从存储桶复制到另一个存储桶,并在需要时重命名。
W
zh raw  
wizardforcel 已提交
2569

W
wizardforcel 已提交
2570
##### FileToGoogleCloudStorageOperator
W
zh raw  
wizardforcel 已提交
2571

W
wizardforcel 已提交
2572
```py
W
wizardforcel 已提交
2573
class airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperatorsrcdstbucketgoogle_cloud_storage_conn_id ='google_cloud_default'mime_type ='application / octet-stream'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
2574 2575
```

W
wizardforcel 已提交
2576
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2577

W
wizardforcel 已提交
2578
将文件上传到Google云端存储
W
zh raw  
wizardforcel 已提交
2579

W
wizardforcel 已提交
2580
参数:
W
zh raw  
wizardforcel 已提交
2581

W
wizardforcel 已提交
2582 2583 2584 2585 2586 2587
*   `src(str)` - 本地文件的路径。(模板)
*   `dst(str)` - 指定存储桶中的目标路径。(模板)
*   `bucket(str)` - 要上传的存储桶。(模板)
*   `google_cloud_storage_conn_id(str)` - 要上传的Airflow连接ID
*   `mime_type(str)` - mime类型字符串
*   `delegate_to(str)` - 模拟的帐户(如果有)
W
zh raw  
wizardforcel 已提交
2588 2589


W
wizardforcel 已提交
2590
```py
W
wizardforcel 已提交
2591
执行上下文 
W
zh raw  
wizardforcel 已提交
2592 2593
```

W
wizardforcel 已提交
2594
将文件上传到Google云端存储
W
zh raw  
wizardforcel 已提交
2595

W
wizardforcel 已提交
2596
##### GoogleCloudStorageCreateBucketOperator
W
zh raw  
wizardforcel 已提交
2597

W
wizardforcel 已提交
2598
```py
W
wizardforcel 已提交
2599
class airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperatorbucket_namestorage_class ='MULTI_REGIONAL'location ='US'project_id = Nonelabels = Nonegoogle_cloud_storage_conn_id ='google_cloud_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
2600 2601
```

W
wizardforcel 已提交
2602
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2603

W
wizardforcel 已提交
2604
创建一个新存储桶。Google云端存储使用平面命名空间,因此您无法创建名称已在使用中的存储桶。
W
zh raw  
wizardforcel 已提交
2605 2606 2607

> 也可以看看
> 
W
wizardforcel 已提交
2608
> 有关详细信息,请参阅存储桶命名指南:[https](https://cloud.google.com/storage/docs/bucketnaming.html):[//cloud.google.com/storage/docs/bucketnaming.html#requirements](https://cloud.google.com/storage/docs/bucketnaming.html)
W
zh raw  
wizardforcel 已提交
2609

W
wizardforcel 已提交
2610
参数:
W
zh raw  
wizardforcel 已提交
2611

W
wizardforcel 已提交
2612 2613
*   `bucket_name(str)` - 存储桶的名称。(模板)
*   `storage_class(str)` -
W
zh raw  
wizardforcel 已提交
2614

W
wizardforcel 已提交
2615
    这定义了存储桶中对象的存储方式,并确定了SLA和存储成本(模板化)。价值包括
W
zh raw  
wizardforcel 已提交
2616 2617 2618 2619 2620

    *   `MULTI_REGIONAL`
    *   `REGIONAL`
    *   `STANDARD`
    *   `NEARLINE`
W
wizardforcel 已提交
2621
    *   `COLDLINE` 。
W
zh raw  
wizardforcel 已提交
2622

W
wizardforcel 已提交
2623
    如果在创建存储桶时未指定此值,则默认为STANDARD。
W
zh raw  
wizardforcel 已提交
2624

W
wizardforcel 已提交
2625
*   `位置(str)` -
W
zh raw  
wizardforcel 已提交
2626

W
wizardforcel 已提交
2627
    水桶的位置。(模板化)存储桶中对象的对象数据驻留在此区域内的物理存储中。默认为美国。
W
zh raw  
wizardforcel 已提交
2628 2629 2630

    也可以看看

W
wizardforcel 已提交
2631
    [https://developers.google.com/storage/docs/bucket-locations](https://developers.google.com/storage/docs/bucket-locations)
W
zh raw  
wizardforcel 已提交
2632

W
wizardforcel 已提交
2633 2634 2635 2636
*   `project_id(str)` - GCP项目的ID。(模板)
*   `labels(dict)` - 用户提供的键/值对标签。
*   `google_cloud_storage_conn_id(str)` - 连接到Google云端存储时使用的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2637 2638


W
wizardforcel 已提交
2639
```py
W
zh raw  
wizardforcel 已提交
2640 2641 2642
 Example: 
```

W
wizardforcel 已提交
2643
以下运算符将在区域中创建`test-bucket`具有`MULTI_REGIONAL`存储类的新存储桶`EU`
W
zh raw  
wizardforcel 已提交
2644

W
wizardforcel 已提交
2645
```py
W
zh raw  
wizardforcel 已提交
2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656
 CreateBucket = GoogleCloudStorageCreateBucketOperator (
    task_id = 'CreateNewBucket' ,
    bucket_name = 'test-bucket' ,
    storage_class = 'MULTI_REGIONAL' ,
    location = 'EU' ,
    labels = { 'env' : 'dev' , 'team' : 'airflow' },
    google_cloud_storage_conn_id = 'airflow-service-account'
)

```

W
wizardforcel 已提交
2657
##### GoogleCloudStorageDownloadOperator
W
zh raw  
wizardforcel 已提交
2658

W
wizardforcel 已提交
2659
```py
W
wizardforcel 已提交
2660
class airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperatorbucketobjectfilename = Nonestore_to_xcom_key = Nonegoogle_cloud_storage_conn_id ='google_cloud_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
2661 2662
```

W
wizardforcel 已提交
2663
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2664

W
wizardforcel 已提交
2665
从Google云端存储下载文件。
W
zh raw  
wizardforcel 已提交
2666

W
wizardforcel 已提交
2667
参数:
W
zh raw  
wizardforcel 已提交
2668

W
wizardforcel 已提交
2669 2670 2671 2672 2673 2674
*   `bucket(str)` - 对象所在的Google云存储桶。(模板)
*   `object(str)` - 要在Google云存储桶中下载的对象的名称。(模板)
*   `filename(str)` - 应将文件下载到的本地文件系统(正在执行操作符的位置)上的文件路径。(模板化)如果未传递文件名,则下载的数据将不会存储在本地文件系统中。
*   `store_to_xcom_key(str)` - 如果设置了此参数,操作员将使用此参数中设置的键将下载文件的内容推送到XCom。如果未设置,则下载的数据不会被推送到XCom。(模板)
*   `google_cloud_storage_conn_id(str)` - 连接到Google云端存储时使用的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2675 2676


W
wizardforcel 已提交
2677
##### GoogleCloudStorageListOperator
W
zh raw  
wizardforcel 已提交
2678

W
wizardforcel 已提交
2679
```py
W
wizardforcel 已提交
2680
class airflow.contrib.operators.gcslistoperator.GoogleCloudStorageListOperatorbucketprefix = Nonedelimiter = Nonegoogle_cloud_storage_conn_id ='google_cloud_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
2681 2682
```

W
wizardforcel 已提交
2683
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2684

W
wizardforcel 已提交
2685
使用名称中的给定字符串前缀和分隔符列出存储桶中的所有对象。
W
zh raw  
wizardforcel 已提交
2686

W
wizardforcel 已提交
2687
```py
W
wizardforcel 已提交
2688
 此运算符返回一个python列表其中包含可供其使用的对象的名称 
W
zh raw  
wizardforcel 已提交
2689 2690
```

W
wizardforcel 已提交
2691
<cite>xcom</cite>在下游任务中。
W
zh raw  
wizardforcel 已提交
2692

W
wizardforcel 已提交
2693
参数:
W
zh raw  
wizardforcel 已提交
2694

W
wizardforcel 已提交
2695 2696 2697 2698 2699
*   `bucket(str)` - 用于查找对象的Google云存储桶。(模板)
*   `prefix(str)` - 前缀字符串,用于过滤名称以此前缀开头的对象。(模板)
*   `delimiter(str)` - 要过滤对象的分隔符。(模板化)例如,要列出GCS目录中的CSV文件,您可以使用delimiter ='。csv'。
*   `google_cloud_storage_conn_id(str)` - 连接到Google云端存储时使用的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2700 2701


W
wizardforcel 已提交
2702
```py
W
zh raw  
wizardforcel 已提交
2703 2704 2705
 Example: 
```

W
wizardforcel 已提交
2706
以下运算符将列出存储桶中文件`sales/sales-2017`夹中的所有Avro文件`data`
W
zh raw  
wizardforcel 已提交
2707

W
wizardforcel 已提交
2708
```py
W
zh raw  
wizardforcel 已提交
2709 2710 2711 2712 2713 2714 2715 2716 2717 2718
 GCS_Files = GoogleCloudStorageListOperator (
    task_id = 'GCS_Files' ,
    bucket = 'data' ,
    prefix = 'sales/sales-2017/' ,
    delimiter = '.avro' ,
    google_cloud_storage_conn_id = google_cloud_conn_id
)

```

W
wizardforcel 已提交
2719
##### GoogleCloudStorageToBigQueryOperator
W
zh raw  
wizardforcel 已提交
2720

W
wizardforcel 已提交
2721
```py
W
wizardforcel 已提交
2722
class airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperatorbucketsource_objectsdestination_project_dataset_tableschema_fields = Noneschema_object = Nonesource_format ='CSV'compression ='NONE'create_disposition ='CREATE_IF_NEEDED'skip_leading_rows = 0write_disposition =' WRITE_EMPTY'field_delimiter =','max_bad_records = 0quote_character = Noneignore_unknown_values = Falseallow_quoted_newlines = Falseallow_jagged_rows = Falsemax_id_key = Nonebigquery_conn_id ='bigquery_default'google_cloud_storage_conn_id ='google_cloud_default'delegate_to = Noneschema_update_options =(),src_fmt_configs = {}external_table = Falsetime_partitioning = {}* args** kwargs 
W
zh raw  
wizardforcel 已提交
2723 2724
```

W
wizardforcel 已提交
2725
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2726

W
wizardforcel 已提交
2727
将文件从Google云存储加载到BigQuery中。
W
zh raw  
wizardforcel 已提交
2728

W
wizardforcel 已提交
2729
可以用两种方法之一指定用于BigQuery表的模式。您可以直接传递架构字段,也可以将运营商指向Google云存储对象名称。Google云存储中的对象必须是包含架构字段的JSON文件。
W
zh raw  
wizardforcel 已提交
2730

W
wizardforcel 已提交
2731
参数:
W
zh raw  
wizardforcel 已提交
2732

W
wizardforcel 已提交
2733
*   `bucket(str)` - 要加载的桶。(模板)
W
wizardforcel 已提交
2734
*   **source_objects** - 要加载的Google云存储URI列表。(模板化)如果source_format是'DATASTORE_BACKUP',则列表必须只包含一个URI。
W
wizardforcel 已提交
2735 2736
*   `destination_project_dataset_table(str)` - 用于加载数据的虚线(&lt;project&gt;。)&lt;dataset&gt;&lt;table&gt; BigQuery表。如果未包含&lt;project&gt;,则项目将是连接json中定义的项目。(模板)
*   `schema_fields(list)` - 如果设置,则此处定义的架构字段列表:[https](https://cloud.google.com/bigquery/docs/reference/v2/jobs)**//cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load**当source_format为'DATASTORE_BACKUP'时,不应设置。
W
wizardforcel 已提交
2737 2738
*   **schema_object** - 如果设置,则指向包含表的架构的.json文件的GCS对象路径。(模板)
*   **schema_object** - 字符串
W
wizardforcel 已提交
2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757
*   `source_format(str)` - 要导出的文件格式。
*   `compression(str)` - [可选]数据源的压缩类型。可能的值包括GZIP和NONE。默认值为NONE。Google Cloud Bigtable,Google Cloud Datastore备份和Avro格式会忽略此设置。
*   `create_disposition(str)` - 如果表不存在,则创建处置。
*   `skip_leading_rows(int)` - 从CSV加载时要跳过的行数。
*   `write_disposition(str)` - 表已存在时的写处置。
*   `field_delimiter(str)` - 从CSV加载时使用的分隔符。
*   `max_bad_records(int)` - BigQuery在运行作业时可以忽略的最大错误记录数。
*   `quote_character(str)` - 用于引用CSV文件中数据部分的值。
*   `ignore_unknown_values(bool)` - [可选]指示BigQuery是否应允许表模式中未表示的额外值。如果为true,则忽略额外值。如果为false,则将具有额外列的记录视为错误记录,如果错误记录太多,则在作业结果中返回无效错误。
*   `allow_quoted_newlines(bool)` - 是否允许引用的换行符(true)或不允许(false)。
*   `allow_jagged_rows(bool)` - 接受缺少尾随可选列的行。缺失值被视为空值。如果为false,则缺少尾随列的记录将被视为错误记录,如果错误记录太多,则会在作业结果中返回无效错误。仅适用于CSV,忽略其他格式。
*   `max_id_key(str)` - 如果设置,则是BigQuery表中要加载的列的名称。在加载发生后,Thsi将用于从BigQuery中选择MAX值。结果将由execute()命令返回,该命令又存储在XCom中供将来的操作员使用。这对增量加载很有帮助 - 在将来的执行过程中,您可以从最大ID中获取。
*   `bigquery_conn_id(str)` - 对特定BigQuery挂钩的引用。
*   `google_cloud_storage_conn_id(str)` - 对特定Google云存储挂钩的引用。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
*   `schema_update_options(list)` - 允许更新目标表的模式作为加载作业的副作用。
*   `src_fmt_configs(dict)` - 配置特定于源格式的可选字段
*   `external_table(bool)` - 用于指定目标表是否应为BigQuery外部表的标志。默认值为False。
*   `time_partitioning(dict)` - 配置可选的时间分区字段,即按API规范按字段,类型和到期分区。请注意,“field”在dataset.table $ partition的并发中不可用。
W
zh raw  
wizardforcel 已提交
2758 2759


W
wizardforcel 已提交
2760
##### GoogleCloudStorageToGoogleCloudStorageOperator
W
zh raw  
wizardforcel 已提交
2761

W
wizardforcel 已提交
2762
```py
W
wizardforcel 已提交
2763
class airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperatorsource_bucketsource_objectdestination_bucket = Nonedestination_object = Nonemove_object = Falsegoogle_cloud_storage_conn_id ='google_cloud_default'delegate_to = None* args** kwargs 
W
zh raw  
wizardforcel 已提交
2764 2765
```

W
wizardforcel 已提交
2766
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
2767

W
wizardforcel 已提交
2768
将对象从存储桶复制到另一个存储桶,并在需要时重命名。
W
zh raw  
wizardforcel 已提交
2769

W
wizardforcel 已提交
2770
参数:
W
zh raw  
wizardforcel 已提交
2771

W
wizardforcel 已提交
2772 2773
*   `source_bucket(str)` - 对象所在的源Google云存储桶。(模板)
*   `source_object(str)` -
W
zh raw  
wizardforcel 已提交
2774

W
wizardforcel 已提交
2775
    要在Google云存储分区中复制的对象的源名称。(模板化)如果在此参数中使用通配符:
W
zh raw  
wizardforcel 已提交
2776

W
wizardforcel 已提交
2777
    &gt; 您只能在存储桶中使用一个通配符作为对象(文件名)。通配符可以出现在对象名称内或对象名称的末尾。不支持在存储桶名称中附加通配符。
W
zh raw  
wizardforcel 已提交
2778

W
wizardforcel 已提交
2779
*   **destination_bucket** - 目标Google云端存储分区
W
zh raw  
wizardforcel 已提交
2780 2781


W
wizardforcel 已提交
2782
对象应该在哪里。(模板化):type destination_bucket:string:param destination_object:对象的目标名称
W
zh raw  
wizardforcel 已提交
2783

W
wizardforcel 已提交
2784 2785 2786
> 目标Google云存储桶。(模板化)如果在source_object参数中提供了通配符,则这是将添加到最终目标对象路径的前缀。请注意,将删除通配符之前的源路径部分; 如果需要保留,则应将其附加到destination_object。例如,使用prefix `foo/*`和destination_object'blah `/``,文件`foo/baz`将被复制到`blah/baz`; 保留前缀写入destination_object,例如`blah/foo`,在这种情况下,复制的文件将被命名`blah/foo/baz`。

参数:**move_object** - 当移动对象为True时,移动对象 
W
zh raw  
wizardforcel 已提交
2787 2788


W
wizardforcel 已提交
2789
```py
W
wizardforcel 已提交
2790
 复制到新位置 
W
zh raw  
wizardforcel 已提交
2791 2792
```

W
wizardforcel 已提交
2793
这相当于mv命令而不是cp命令。
W
zh raw  
wizardforcel 已提交
2794

W
wizardforcel 已提交
2795
参数:
W
zh raw  
wizardforcel 已提交
2796

W
wizardforcel 已提交
2797 2798
*   `google_cloud_storage_conn_id(str)` - 连接到Google云端存储时使用的连接ID。
*   `delegate_to(str)` - 模拟的帐户(如果有)。为此,发出请求的服务帐户必须启用域范围委派。
W
zh raw  
wizardforcel 已提交
2799 2800


W
wizardforcel 已提交
2801
```py
W
zh raw  
wizardforcel 已提交
2802 2803 2804
 Examples: 
```

W
wizardforcel 已提交
2805
下面的操作将命名一个文件复制`sales/sales-2017/january.avro``data`桶的文件和名为斗`copied_sales/2017/january-backup.avro` in the ``data_backup`
W
zh raw  
wizardforcel 已提交
2806

W
wizardforcel 已提交
2807
```py
W
zh raw  
wizardforcel 已提交
2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818
 copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator (
    task_id = 'copy_single_file' ,
    source_bucket = 'data' ,
    source_object = 'sales/sales-2017/january.avro' ,
    destination_bucket = 'data_backup' ,
    destination_object = 'copied_sales/2017/january-backup.avro' ,
    google_cloud_storage_conn_id = google_cloud_conn_id
)

```

W
wizardforcel 已提交
2819
以下运算符会将文件`sales/sales-2017`夹中的所有Avro文件(即名称以该前缀开头)复制到存储`data`桶中的`copied_sales/2017`文件夹中`data_backup`。
W
zh raw  
wizardforcel 已提交
2820

W
wizardforcel 已提交
2821
```py
W
zh raw  
wizardforcel 已提交
2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832
 copy_files = GoogleCloudStorageToGoogleCloudStorageOperator (
    task_id = 'copy_files' ,
    source_bucket = 'data' ,
    source_object = 'sales/sales-2017/*.avro' ,
    destination_bucket = 'data_backup' ,
    destination_object = 'copied_sales/2017/' ,
    google_cloud_storage_conn_id = google_cloud_conn_id
)

```

W
wizardforcel 已提交
2833
以下运算符会将文件`sales/sales-2017`夹中的所有Avro文件(即名称以该前缀开头)移动到`data`存储桶中的同一文件夹`data_backup`,删除过程中的原始文件。
W
zh raw  
wizardforcel 已提交
2834

W
wizardforcel 已提交
2835
```py
W
zh raw  
wizardforcel 已提交
2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846
 move_files = GoogleCloudStorageToGoogleCloudStorageOperator (
    task_id = 'move_files' ,
    source_bucket = 'data' ,
    source_object = 'sales/sales-2017/*.avro' ,
    destination_bucket = 'data_backup' ,
    move_object = True ,
    google_cloud_storage_conn_id = google_cloud_conn_id
)

```

W
wizardforcel 已提交
2847
#### GoogleCloudStorageHook
W
zh raw  
wizardforcel 已提交
2848

W
wizardforcel 已提交
2849
```py
W
wizardforcel 已提交
2850
class airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook(google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None) 
W
zh raw  
wizardforcel 已提交
2851 2852
```

W
wizardforcel 已提交
2853
基类: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`](code.html "airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook")
W
zh raw  
wizardforcel 已提交
2854

W
wizardforcel 已提交
2855
与Google云端存储互动。此挂钩使用Google Cloud Platform连接。
W
zh raw  
wizardforcel 已提交
2856

W
wizardforcel 已提交
2857
```py
W
wizardforcel 已提交
2858
copy(source_bucket,source_object,destination_bucket = None,destination_object = None) 
W
zh raw  
wizardforcel 已提交
2859 2860
```

W
wizardforcel 已提交
2861
将对象从存储桶复制到另一个存储桶,并在需要时重命名。
W
zh raw  
wizardforcel 已提交
2862

W
wizardforcel 已提交
2863
destination_bucket或destination_object可以省略,在这种情况下使用源桶/对象,但不能同时使用两者。
W
zh raw  
wizardforcel 已提交
2864

W
wizardforcel 已提交
2865
参数:
W
zh raw  
wizardforcel 已提交
2866

W
wizardforcel 已提交
2867 2868 2869
*   `source_bucket(str)` - 要从中复制的对象的存储桶。
*   `source_object(str)` - 要复制的对象。
*   `destination_bucket(str)` - 要复制到的对象的目标。可以省略; 然后使用相同的桶。
W
wizardforcel 已提交
2870
*   **destination_object** - 给定对象的(重命名)路径。可以省略; 然后使用相同的名称。
W
zh raw  
wizardforcel 已提交
2871 2872


W
wizardforcel 已提交
2873
```py
W
wizardforcel 已提交
2874
create_bucket(bucket_name,storage_class ='MULTI_REGIONAL',location ='US',project_id = None,labels = None) 
W
zh raw  
wizardforcel 已提交
2875 2876
```

W
wizardforcel 已提交
2877
创建一个新存储桶。Google云端存储使用平面命名空间,因此您无法创建名称已在使用中的存储桶。
W
zh raw  
wizardforcel 已提交
2878 2879 2880

也可以看看

W
wizardforcel 已提交
2881
有关详细信息,请参阅存储桶命名指南:[https](https://cloud.google.com/storage/docs/bucketnaming.html):[//cloud.google.com/storage/docs/bucketnaming.html#requirements](https://cloud.google.com/storage/docs/bucketnaming.html)
W
zh raw  
wizardforcel 已提交
2882

W
wizardforcel 已提交
2883
参数:
W
zh raw  
wizardforcel 已提交
2884

W
wizardforcel 已提交
2885 2886
*   `bucket_name(str)` - 存储桶的名称。
*   `storage_class(str)` -
W
zh raw  
wizardforcel 已提交
2887

W
wizardforcel 已提交
2888
    这定义了存储桶中对象的存储方式,并确定了SLA和存储成本。价值包括
W
zh raw  
wizardforcel 已提交
2889 2890 2891 2892 2893

    *   `MULTI_REGIONAL`
    *   `REGIONAL`
    *   `STANDARD`
    *   `NEARLINE`
W
wizardforcel 已提交
2894
    *   `COLDLINE` 。
W
zh raw  
wizardforcel 已提交
2895

W
wizardforcel 已提交
2896
    如果在创建存储桶时未指定此值,则默认为STANDARD。
W
zh raw  
wizardforcel 已提交
2897

W
wizardforcel 已提交
2898
*   `位置(str)` -
W
zh raw  
wizardforcel 已提交
2899

W
wizardforcel 已提交
2900
    水桶的位置。存储桶中对象的对象数据驻留在此区域内的物理存储中。默认为美国。
W
zh raw  
wizardforcel 已提交
2901 2902 2903

    也可以看看

W
wizardforcel 已提交
2904
    [https://developers.google.com/storage/docs/bucket-locations](https://developers.google.com/storage/docs/bucket-locations)
W
zh raw  
wizardforcel 已提交
2905

W
wizardforcel 已提交
2906 2907
*   `project_id(str)` - GCP项目的ID。
*   `labels(dict)` - 用户提供的键/值对标签。
W
zh raw  
wizardforcel 已提交
2908

W
wizardforcel 已提交
2909
返回:如果成功,则返回`id`桶的内容。
W
zh raw  
wizardforcel 已提交
2910

W
wizardforcel 已提交
2911
```py
W
wizardforcel 已提交
2912
删除(桶,对象,生成=无) 
W
zh raw  
wizardforcel 已提交
2913 2914
```

W
wizardforcel 已提交
2915
如果未对存储桶启用版本控制,或者使用了生成参数,则删除对象。
W
zh raw  
wizardforcel 已提交
2916

W
wizardforcel 已提交
2917
参数:
W
zh raw  
wizardforcel 已提交
2918

W
wizardforcel 已提交
2919 2920 2921
*   `bucket(str)` - 对象所在的存储桶的名称
*   `object(str)` - 要删除的对象的名称
*   `generation(str)` - 如果存在,则永久删除该代的对象
W
zh raw  
wizardforcel 已提交
2922

W
wizardforcel 已提交
2923
返回:如果成功则为真
W
zh raw  
wizardforcel 已提交
2924

W
wizardforcel 已提交
2925
```py
W
wizardforcel 已提交
2926
下载(bucket,object,filename = None) 
W
zh raw  
wizardforcel 已提交
2927 2928
```

W
wizardforcel 已提交
2929
从Google云端存储中获取文件。
W
zh raw  
wizardforcel 已提交
2930

W
wizardforcel 已提交
2931
参数:
W
zh raw  
wizardforcel 已提交
2932

W
wizardforcel 已提交
2933 2934 2935
*   `bucket(str)` - 要获取的存储桶。
*   `object(str)` - 要获取的对象。
*   `filename(str)` - 如果设置,则应写入文件的本地文件路径。
W
zh raw  
wizardforcel 已提交
2936 2937


W
wizardforcel 已提交
2938
```py
W
wizardforcel 已提交
2939
存在(桶,对象) 
W
zh raw  
wizardforcel 已提交
2940 2941
```

W
wizardforcel 已提交
2942
检查Google云端存储中是否存在文件。
W
zh raw  
wizardforcel 已提交
2943

W
wizardforcel 已提交
2944
参数:
W
zh raw  
wizardforcel 已提交
2945

W
wizardforcel 已提交
2946 2947
*   `bucket(str)` - 对象所在的Google云存储桶。
*   `object(str)` - 要在Google云存储分区中检查的对象的名称。
W
zh raw  
wizardforcel 已提交
2948 2949


W
wizardforcel 已提交
2950
```py
W
wizardforcel 已提交
2951
get_conn() 
W
zh raw  
wizardforcel 已提交
2952 2953
```

W
wizardforcel 已提交
2954
返回Google云端存储服务对象。
W
zh raw  
wizardforcel 已提交
2955

W
wizardforcel 已提交
2956
```py
W
wizardforcel 已提交
2957
get_crc32c(bucket,object) 
W
zh raw  
wizardforcel 已提交
2958 2959
```

W
wizardforcel 已提交
2960
获取Google Cloud Storage中对象的CRC32c校验和。
W
zh raw  
wizardforcel 已提交
2961

W
wizardforcel 已提交
2962
参数:
W
zh raw  
wizardforcel 已提交
2963

W
wizardforcel 已提交
2964 2965
*   `bucket(str)` - 对象所在的Google云存储桶。
*   `object(str)` - 要在Google云存储分区中检查的对象的名称。
W
zh raw  
wizardforcel 已提交
2966 2967


W
wizardforcel 已提交
2968
```py
W
wizardforcel 已提交
2969
get_md5hash(bucket,object) 
W
zh raw  
wizardforcel 已提交
2970 2971
```

W
wizardforcel 已提交
2972
获取Google云端存储中对象的MD5哈希值。
W
zh raw  
wizardforcel 已提交
2973

W
wizardforcel 已提交
2974
参数:
W
zh raw  
wizardforcel 已提交
2975

W
wizardforcel 已提交
2976 2977
*   `bucket(str)` - 对象所在的Google云存储桶。
*   `object(str)` - 要在Google云存储分区中检查的对象的名称。
W
zh raw  
wizardforcel 已提交
2978 2979


W
wizardforcel 已提交
2980
```py
W
wizardforcel 已提交
2981
get_size(bucket,object) 
W
zh raw  
wizardforcel 已提交
2982 2983
```

W
wizardforcel 已提交
2984
获取Google云端存储中文件的大小。
W
zh raw  
wizardforcel 已提交
2985

W
wizardforcel 已提交
2986
参数:
W
zh raw  
wizardforcel 已提交
2987

W
wizardforcel 已提交
2988 2989
*   `bucket(str)` - 对象所在的Google云存储桶。
*   `object(str)` - 要在Google云存储分区中检查的对象的名称。
W
zh raw  
wizardforcel 已提交
2990 2991


W
wizardforcel 已提交
2992
```py
W
wizardforcel 已提交
2993
is_updated_after(bucket,object,ts) 
W
zh raw  
wizardforcel 已提交
2994 2995
```

W
wizardforcel 已提交
2996
检查Google Cloud Storage中是否更新了对象。
W
zh raw  
wizardforcel 已提交
2997

W
wizardforcel 已提交
2998
参数:
W
zh raw  
wizardforcel 已提交
2999

W
wizardforcel 已提交
3000 3001 3002
*   `bucket(str)` - 对象所在的Google云存储桶。
*   `object(str)` - 要在Google云存储分区中检查的对象的名称。
*   `ts(datetime)` - 要检查的时间戳。
W
zh raw  
wizardforcel 已提交
3003 3004


W
wizardforcel 已提交
3005
```py
W
wizardforcel 已提交
3006
list(bucket,versions = None,maxResults = None,prefix = None,delimiter = None) 
W
zh raw  
wizardforcel 已提交
3007 3008
```

W
wizardforcel 已提交
3009
使用名称中的给定字符串前缀列出存储桶中的所有对象
W
zh raw  
wizardforcel 已提交
3010

W
wizardforcel 已提交
3011
参数:
W
zh raw  
wizardforcel 已提交
3012

W
wizardforcel 已提交
3013 3014 3015 3016 3017
*   `bucket(str)` - 存储桶名称
*   `versions(bool)` - 如果为true,则列出对象的所有版本
*   `maxResults(int)` - 在单个响应页面中返回的最大项目数
*   `prefix(str)` - 前缀字符串,用于过滤名称以此前缀开头的对象
*   `delimiter(str)` - 根据分隔符过滤对象(例如'.csv')
W
zh raw  
wizardforcel 已提交
3018

W
wizardforcel 已提交
3019
返回:与过滤条件匹配的对象名称流
W
zh raw  
wizardforcel 已提交
3020

W
wizardforcel 已提交
3021
```py
W
wizardforcel 已提交
3022
重写(source_bucket,source_object,destination_bucket,destination_object = None) 
W
zh raw  
wizardforcel 已提交
3023 3024
```

W
wizardforcel 已提交
3025
具有与复制相同的功能,除了可以处理超过5 TB的文件,以及在位置和/或存储类之间复制时。
W
zh raw  
wizardforcel 已提交
3026

W
wizardforcel 已提交
3027
destination_object可以省略,在这种情况下使用source_object。
W
zh raw  
wizardforcel 已提交
3028

W
wizardforcel 已提交
3029
参数:
W
zh raw  
wizardforcel 已提交
3030

W
wizardforcel 已提交
3031 3032 3033
*   `source_bucket(str)` - 要从中复制的对象的存储桶。
*   `source_object(str)` - 要复制的对象。
*   `destination_bucket(str)` - 要复制到的对象的目标。
W
wizardforcel 已提交
3034
*   **destination_object** - 给定对象的(重命名)路径。可以省略; 然后使用相同的名称。
W
zh raw  
wizardforcel 已提交
3035 3036


W
wizardforcel 已提交
3037
```py
W
wizardforcel 已提交
3038
upload(bucket,object,filename,mime_type ='application / octet-stream') 
W
zh raw  
wizardforcel 已提交
3039 3040
```

W
wizardforcel 已提交
3041
将本地文件上传到Google云端存储。
W
zh raw  
wizardforcel 已提交
3042

W
wizardforcel 已提交
3043
参数:
W
zh raw  
wizardforcel 已提交
3044

W
wizardforcel 已提交
3045 3046 3047 3048
*   `bucket(str)` - 要上传的存储桶。
*   `object(str)` - 上载本地文件时要设置的对象名称。
*   `filename(str)` - 要上载的文件的本地文件路径。
*   `mime_type(str)` - 上载文件时要设置的MIME类型。
W
zh raw  
wizardforcel 已提交
3049 3050


W
wizardforcel 已提交
3051
### 谷歌Kubernetes引擎
W
zh raw  
wizardforcel 已提交
3052

W
wizardforcel 已提交
3053
#### Google Kubernetes引擎集群运营商
W
zh raw  
wizardforcel 已提交
3054

W
wizardforcel 已提交
3055 3056
*   [GKEClusterDeleteOperator](28):在Google Cloud Platform中创建Kubernetes群集
*   [GKEPodOperator](28):删除Google Cloud Platform中的Kubernetes群集
W
zh raw  
wizardforcel 已提交
3057

W
wizardforcel 已提交
3058
##### GKEClusterCreateOperator
W
zh raw  
wizardforcel 已提交
3059

W
wizardforcel 已提交
3060
```py
W
wizardforcel 已提交
3061
class airflow.contrib.operators.gcp_container_operator.GKEClusterCreateOperator(project_id,location,body = {},gcp_conn_id ='google_cloud_default',api_version ='v2',* args,** kwargs) 
W
zh raw  
wizardforcel 已提交
3062 3063
```

W
wizardforcel 已提交
3064
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
3065

W
wizardforcel 已提交
3066
##### GKEClusterDeleteOperator
W
zh raw  
wizardforcel 已提交
3067

W
wizardforcel 已提交
3068
```py
W
wizardforcel 已提交
3069
class airflow.contrib.operators.gcp_container_operator.GKEClusterDeleteOperator(project_id,name,location,gcp_conn_id ='google_cloud_default',api_version ='v2',* args,** kwargs) 
W
zh raw  
wizardforcel 已提交
3070 3071
```

W
wizardforcel 已提交
3072
基类: [`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")
W
zh raw  
wizardforcel 已提交
3073

W
wizardforcel 已提交
3074
##### GKEPodOperator
W
zh raw  
wizardforcel 已提交
3075

W
wizardforcel 已提交
3076
#### Google Kubernetes Engine Hook
W
zh raw  
wizardforcel 已提交
3077

W
wizardforcel 已提交
3078
```py
W
wizardforcel 已提交
3079
class airflow.contrib.hooks.gcp_container_hook.GKEClusterHook(project_id,location) 
W
zh raw  
wizardforcel 已提交
3080 3081
```

W
wizardforcel 已提交
3082
基类: `airflow.hooks.base_hook.BaseHook`
W
zh raw  
wizardforcel 已提交
3083

W
wizardforcel 已提交
3084
```py
W
wizardforcel 已提交
3085
create_cluster(cluster,retry = <object object>,timeout = <object object>) 
W
zh raw  
wizardforcel 已提交
3086 3087
```

W
wizardforcel 已提交
3088
创建一个群集,由指定数量和类型的Google Compute Engine实例组成。
W
zh raw  
wizardforcel 已提交
3089

W
wizardforcel 已提交
3090
参数:
W
zh raw  
wizardforcel 已提交
3091

W
wizardforcel 已提交
3092 3093 3094
*   `cluster(dict 或 google.cloud.container_v1.types.Cluster)` - 群集protobuf或dict。如果提供了dict,它必须与protobuf消息的格式相同google.cloud.container_v1.types.Cluster
*   `重试(google.api_core.retry.Retry)` - 用于重试请求的重试对象(google.api_core.retry.Retry)。如果指定None,则不会重试请求。
*   `timeout(float)` - 等待请求完成的时间(以秒为单位)。请注意,如果指定了重试,则超时适用于每次单独尝试。
W
zh raw  
wizardforcel 已提交
3095

W
wizardforcel 已提交
3096
返回:新集群或现有集群的完整URL
W
zh raw  
wizardforcel 已提交
3097

W
wizardforcel 已提交
3098
```py
W
wizardforcel 已提交
3099
 :加薪 
W
zh raw  
wizardforcel 已提交
3100 3101
```

W
wizardforcel 已提交
3102
ParseError:在尝试转换dict时出现JSON解析问题AirflowException:cluster不是dict类型也不是Cluster proto类型
W
zh raw  
wizardforcel 已提交
3103

W
wizardforcel 已提交
3104
```py
W
wizardforcel 已提交
3105
delete_cluster(name,retry = <object object>,timeout = <object object>) 
W
zh raw  
wizardforcel 已提交
3106 3107
```

W
wizardforcel 已提交
3108
删除集群,包括Kubernetes端点和所有工作节点。在群集创建期间配置的防火墙和路由也将被删除。群集可能正在使用的其他Google Compute Engine资源(例如,负载均衡器资源)如果在初始创建时不存在,则不会被删除。
W
zh raw  
wizardforcel 已提交
3109

W
wizardforcel 已提交
3110
参数:
W
zh raw  
wizardforcel 已提交
3111

W
wizardforcel 已提交
3112 3113 3114
*   `name(str)` - 要删除的集群的名称
*   `重试(google.api_core.retry.Retry)` - 重_试用_于确定何时/是否重试请求的对象。如果指定None,则不会重试请求。
*   `timeout(float)` - 等待请求完成的时间(以秒为单位)。请注意,如果指定了重试,则超时适用于每次单独尝试。
W
zh raw  
wizardforcel 已提交
3115

W
wizardforcel 已提交
3116
返回:如果成功则删除操作的完整URL,否则为None
W
zh raw  
wizardforcel 已提交
3117

W
wizardforcel 已提交
3118
```py
W
wizardforcel 已提交
3119
get_cluster(name,retry = <object object>,timeout = <object object>) 
W
zh raw  
wizardforcel 已提交
3120 3121
```

W
wizardforcel 已提交
3122 3123 3124
获取指定集群的详细信息:param name:要检索的集群的名称:type name:str:param retry:用于重试请求的重试对象。如果指定了None,

> 请求不会被重试。
W
zh raw  
wizardforcel 已提交
3125

W
wizardforcel 已提交
3126
参数:`timeout(float)` - 等待请求完成的时间(以秒为单位)。请注意,如果指定了重试,则超时适用于每次单独尝试。 
W
zh raw  
wizardforcel 已提交
3127

W
wizardforcel 已提交
3128
返回:一个google.cloud.container_v1.types.Cluster实例
W
zh raw  
wizardforcel 已提交
3129

W
wizardforcel 已提交
3130
```py
W
wizardforcel 已提交
3131
GET_OPERATION(OPERATION_NAME) 
W
zh raw  
wizardforcel 已提交
3132 3133
```

W
wizardforcel 已提交
3134
从Google Cloud获取操作:param operation_name:要获取的操作的名称:type operation_name:str:return:来自Google Cloud的新的更新操作
W
zh raw  
wizardforcel 已提交
3135

W
wizardforcel 已提交
3136
```py
W
wizardforcel 已提交
3137
wait_for_operation(操作) 
W
zh raw  
wizardforcel 已提交
3138 3139
```

W
wizardforcel 已提交
3140
给定操作,持续从Google Cloud获取状态,直到完成或发生错误:param操作:等待的操作:键入操作:google.cloud.container_V1.gapic.enums.Operator:return:a new,updated从Google Cloud获取的操作