70.md 24.2 KB
Newer Older
W
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736


# 用户定义的 Sources 和 Sinks

`TableSource` 提供对存储在外部系统(数据库、键值存储、消息队列)或文件中的数据的访问。[TableSource 在 TableEnvironment 中注册后](common.html#register-a-tablesource)可以通过[Table API](tableApi.html)[SQL](sql.html)查询访问它。

`TableSink`[发送 Table](common.html#emit-a-table)到外部存储系统,如数据库、键值存储、消息队列或文件系统(以不同的编码方式,如 CSV、Parquet 或 ORC)。

`TableFactory` 允许将与外部系统的连接声明与实际实现分离。表工厂从规范化的、基于字符串的属性创建表源(sources)和表接收器(sinks)的已配置实例。可以使用`描述符(Descriptor)`以编程方式生成属性,也可以通过[SQL Client](sqlClient.html)的YAML配置文件生成属性。

查看[常见概念和API](common.html)页面,了解如何[注册TableSource](common.html#register-a-tablesource)以及如何[通过 TableSink 发出 Table](common.html#emit-a-table)的详细信息。有关如何使用工厂的示例,请参阅[内置源,接收器和格式](connect.html)页面。

## 定义一个表源(TableSource)

`TableSource` 是一个通用接口,允许 Table API 和 SQL 查询访问存储在外部系统中的数据。它提供了表的模式以及映射到具有表模式的行的记录。根据是否在流式或批量查询中使用 `TableSource`,记录将作为 `DataSet``DataStream` 生成。

如果在流查询中使用 `TableSource`,它必须实现 `StreamTableSource` 接口;如果在批处理查询中使用它,它必须实现 `BatchTableSource` 接口。`TableSource` 还可以实现这两个接口,并可用于流查询和批处理查询。

`StreamTableSource``BatchTableSource` 扩展了定义了以下方法的基本接口 `TableSource`:



```
TableSource<T> {

  public TableSchema getTableSchema();

  public TypeInformation<T> getReturnType();

  public String explainSource();
}
```





```
TableSource[T] {

  def getTableSchema: TableSchema

  def getReturnType: TypeInformation[T]

  def explainSource: String

}
```



*   `getTableSchema()`: 返回表的模式,即表中字段的名称和类型。字段类型是使用 Flink 的 `类型信息(TypeInformation)`(参见[Table API types](tableApi.html#data-types)[SQL types](sql.html#data-types))定义的。

*   `getReturnType()`: 返回 `DataStream` (`StreamTableSource`) 或 `DataSet` (`BatchTableSource`) 的物理类型和由 `TableSource` 生成的记录。

*   `explainSource()`: 返回描述 `TableSource` 的字符串。此方法是可选的,仅用于显示目的。

`TableSource` 接口将逻辑表模式与返回的 `DataStream``DataSet` 的物理类型分开。因此,表模式(`getTableSchema()`)的所有字段必须映射到物理返回类型(`getReturnType()`)的对应类型的字段。默认情况下,这个映射是基于字段名完成的。例如,定义具有两个字段`[name: String, size: Integer]`的表模式的 `TableSource` 需要一个 `TypeInformation`,其中至少有两个字段名为 `name``size` ,类型为 `String``Integer`。 这可能是一个 `PojoTypeInfo``RowTypeInfo`,它有两个名为 `name``size` 的字段匹配类型。

然而,有些类型,如 Tuple 或 CaseClass 类型,确实支持自定义字段名。如果 `TableSource` 返回具有固定字段名的类型的 `DataStream``DataSet`,它可以实现 `DefinedFieldMapping` 接口,将字段名从表模式映射到物理返回类型的字段名。

### 定义一个 BatchTableSource

`BatchTableSource` 接口扩展了 `TableSource` 接口,并定义了一个额外的方法:



```
BatchTableSource<T> implements TableSource<T> {

  public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
}
```





```
BatchTableSource[T] extends TableSource[T] {

  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
```



*   `getDataSet(execEnv)`: 返回包含表数据的 `DataSet``DataSet` 的类型必须与 `TableSource.getReturnType()` 方法定义的返回类型相同。`DataSet` 可以通过使用 DataSet API 的常规[数据源](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources)创建。通常,`BatchTableSource` 是通过包装 `InputFormat`[batch connector](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/connectors.html)来实现的。

### 定义一个 StreamTableSource

`StreamTableSource` 接口扩展了 `TableSource` 接口,并定义了一个额外的方法:



```
StreamTableSource<T> implements TableSource<T> {

  public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}
```





```
StreamTableSource[T] extends TableSource[T] {

  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
```



*   `getDataStream(execEnv)`: 返回带有表数据的`DataStream``DataStream` 的类型必须与 `TableSource.getReturnType()` 方法定义的返回类型相同。`DataStream` 可以通过使用 DataStream API 的常规[数据源(data source)](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/datastream_api.html#data-sources)创建。通常,`StreamTableSource` 是通过包装 `SourceFunction`[流连接器(stream connector)](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/)来实现的。

### 使用时间属性定义表源(TableSource)

流式[表API](tableApi.html#group-windows)[SQL](sql.html#group-windows)查询的基于时间的操作(如窗口聚合或连接)需要显式指定[时间属性](streaming/time_attributes.html)

`TableSource` 在其表模式中将 time 属性定义为类型为 `Types.SQL_TIMESTAMP` 的字段。与模式中的所有常规字段相比,时间属性不能与表源返回类型中的物理字段匹配。相反,`TableSource`通过实现某个接口来定义时间属性。

#### 定义处理时间属性(Processing Time Attribute)

[处理时间属性](streaming/time_attributes.html#processing-time)通常用于流查询。处理时间属性返回访问它的操作符的当前壁钟时间(wall-clock time)。`TableSource` 通过实现 `DefinedProctimeAttribute` 接口来定义处理时间属性。接口如下:



```
DefinedProctimeAttribute {

  public String getProctimeAttribute();
}
```





```
DefinedProctimeAttribute {

  def getProctimeAttribute: String
}
```



*   `getProctimeAttribute()`: 返回处理时间属性的名称。必须在表模式中定义指定的属性类型 `Types.SQL_TIMESTAMP`,并且可以在基于时间的操作中使用。`DefinedProctimeAttribute` 表源可以通过返回 `null` 来定义没有处理时间属性。

注意 `StreamTableSource``BatchTableSource` 都可以实现 `DefinedProctimeAttribute` 并定义处理时间属性。对于 `BatchTableSource`,处理时间字段在表扫描期间使用当前时间戳初始化。

#### 定义 Rowtime 属性(Rowtime Attribute)

[Rowtime属性](streaming/time_attributes.html#event-time)`TIMESTAMP` 类型的属性,在流和批处理查询中以统一的方式处理。

通过指定,可以将类型为 `SQL_TIMESTAMP` 的表模式字段声明为 rowtime 属性

*   字段名,
*   `TimestampExtractor` 计算属性的实际值(通常来自一个或多个其他字段),以及
*   指定如何为 rowtim e属性生成水印的 `WatermarkStrategy`

`TableSource` 通过实现 `DefinedRowtimeAttributes` 接口定义了一个 rowtime 属性。接口如下:



```
DefinedRowtimeAttribute {

  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
```





```
DefinedRowtimeAttributes {

  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
```



*   `getRowtimeAttributeDescriptors()`: 返回一个 `RowtimeAttributeDescriptor` 列表。`RowtimeAttributeDescriptor` 描述了一个具有以下属性的 rowtime 属性:
    *   `attributeName`: 表模式中 rowtime 属性的名称。必须使用类型 `Types.SQL_TIMESTAMP` 定义字段。
    *   `timestampExtractor`: 时间戳提取器(timestamp extractor)从具有返回类型的记录中提取时间戳。例如,它可以将长字段转换为时间戳,或者解析字符串编码的时间戳。Flink 附带一组内置的 `TimestampExtractor` 实现,用于常见的用例。还可以提供自定义实现。
    *   `watermarkStrategy`: 水印策略(watermark strategy)定义了如何为 rowtime 属性生成水印。Flink 为常见用例提供了一组内置的 `WatermarkStrategy` 实现。还可以提供自定义实现。

注意,虽然 `getRowtimeAttributeDescriptors()` 方法返回一个描述符列表,但目前只支持一个 rowtime 属性。我们计划在将来删除这个限制,并支持具有多个 rowtime 属性的表。

请注意,`StreamTableSource``BatchTableSource` 都可以实现 `DefinedRowtimeAttributes` 并定义 rowtime 属性。在这两种情况下,行时间字段都是使用 `TimestampExtractor` 提取的。因此,实现 `StreamTableSource``BatchTableSource` 并定义 rowtime 属性的 `TableSource` 为流处理和批处理查询提供了完全相同的数据。

##### 提供时间戳提取器(Timestamp Extractors)

Flink 为常用用例提供了 `TimestampExtractor` 实现。

以下 `TimestampExtractor` 实现目前可用:

*   `ExistingField(fieldName)`: 从现有的 `LONG`, `SQL_TIMESTAMP` 或格式化的 `STRING` 字段中提取 rowtime 属性的值。这样一个字符串的一个例子是 ‘2018-05-28 12:34:56.000’。
*   `StreamRecordTimestamp()`: 从 `DataStream` `StreamRecord` 的时间戳中提取 rowtime 属性的值。注意,此 `TimestampExtractor` 不适用于批处理表源。

自定义的 `TimestampExtractor` 可以通过实现相应的接口来定义。

##### 提供水印策略(Watermark Strategies)

Flink 为常用用例提供了`水印策略(WatermarkStrategy)`实现。

目前提供以下 `WatermarkStrategy` 实现:

*   `AscendingTimestamps`: 用于提升时间戳的水印策略。带有无序时间戳的记录将被视为迟到。
*   `BoundedOutOfOrderTimestamps(delay)`: 时间戳的水印策略,其最多在指定的延迟之外是无序的。
*   `PreserveWatermarks()`: 应该从底层的 `DataStream` 中保留指示水印的策略。

可以通过实现相应的接口来定义自定义的 `WatermarkStrategy`

### 使用投影下推定义表源 Defining a TableSource with Projection Push-Down

`TableSource` 通过实现 `ProjectableTableSource` 接口支持投影下推。该接口定义了一个单一的方法:



```
ProjectableTableSource<T> {

  public TableSource<T> projectFields(int[] fields);
}
```





```
ProjectableTableSource[T] {

  def projectFields(fields: Array[Int]): TableSource[T]
}
```



*   `projectFields(fields)`: 返回具有调整后的物理返回类型的 `TableSource` 的 _副本(copy)_。`fields` 参数提供了必须由 `TableSource` 提供的字段的索引。索引与物理返回类型的 `TypeInformation` 有关,而 _不是_ 与逻辑表模式相关。复制的 `TableSource` 必须调整其返回类型和返回的 `DataStream``DataSet`。复制的 `TableSource``TableSchema` 不能更改,即它必须与原来的 `TableSource` 相同。如果 `TableSource` 实现了 `DefinedFieldMapping` 接口,则必须将字段映射调整为新的返回类型。

`ProjectableTableSource` 为项目平面字段添加了支持。如果 `TableSource` 定义了一个带有嵌套模式的表,它可以实现 `NestedFieldsProjectableTableSource` 来将投影扩展到嵌套字段。`NestedFieldsProjectableTableSource` 的定义如下:



```
NestedFieldsProjectableTableSource<T> {

  public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
}
```





```
NestedFieldsProjectableTableSource[T] {

  def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
```



*   `projectNestedField(fields, nestedFields)`: 返回 `TableSource` 的具有调整后的物理返回类型的一个 _副本(copy)_。可以删除或重新排序物理返回类型的字段,但不能更改它们的类型。该方法的契约(contract)本质上与 `ProjectableTableSource.projectFields()` 方法相同。此外,`nestedFields` 参数包含 `fields` 列表中的每个字段索引,这是查询访问的所有嵌套字段的路径列表。所有其他嵌套字段都不需要在 `TableSource` 生成的记录中读取、解析和设置。**最重要的是** 不能更改投影字段的类型,但是可以将未使用的字段设置为null或默认值。

### 使用下推过滤器定义表源(TableSource)

`FilterableTableSource` 接口增加了对 `TableSource` 下推过滤器的支持。扩展这个接口的 `TableSource` 能够过滤记录,这样返回的 `DataStream``DataSet` 返回的记录就会更少。

接口如下:



```
FilterableTableSource<T> {

  public TableSource<T> applyPredicate(List<Expression> predicates);

  public boolean isFilterPushedDown();
}
```





```
FilterableTableSource[T] {

  def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]

  def isFilterPushedDown: Boolean
}
```



*   `applyPredicate(predicates)`: 返回带有添加谓词的 `TableSource` 的 _副本(copy)_。`predicates` 参数是一个可变的连接谓词列表,它们“提供(offered)”给 `TableSource``TableSource` 接受通过从列表中删除谓词来计算谓词的值。列表中剩下的谓词将由后续过滤器操作符计算。
*   `isFilterPushedDown()`: 如果之前调用了 `applyPredicate()` 方法,则返回 true。因此,对于从 `applyPredicate()` 调用返回的所有 `TableSource` 实例,`isFilterPushedDown()` 必须返回 true。

## 定义一个 TableSink

`TableSink` 指定如何将 `Table` 发送到外部系统或位置。该接口是通用的,因此它可以支持不同的存储位置和格式。对于批处理表和流表,有不同的表接收器(table sinks)。

通用接口如下:



```
TableSink<T> {

  public TypeInformation<T> getOutputType();

  public String[] getFieldNames();

  public TypeInformation[] getFieldTypes();

  public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
}
```





```
TableSink[T] {

  def getOutputType: TypeInformation<T>

  def getFieldNames: Array[String]

  def getFieldTypes: Array[TypeInformation]

  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
}
```



调用 `TableSink#configure` 方法将表的模式(字段名和类型)传递给 `TableSink`。该方法必须返回 TableSink 的一个新实例,该实例被配置为发出所提供的表模式。

### 批处理表接收器(BatchTableSink)

定义外部 `TableSink` 以发出批处理表。

接口如下:



```
BatchTableSink<T> implements TableSink<T> {

  public void emitDataSet(DataSet<T> dataSet);
}
```





```
BatchTableSink[T] extends TableSink[T] {

  def emitDataSet(dataSet: DataSet[T]): Unit
}
```



### 附加流表接收器(AppendStreamTableSink)

定义一个外部 `TableSink` 来发出只包含插入更改的流表。

接口如下:



```
AppendStreamTableSink<T> implements TableSink<T> {

  public void emitDataStream(DataStream<T> dataStream);
}
```





```
AppendStreamTableSink[T] extends TableSink[T] {

  def emitDataStream(dataStream: DataStream<T>): Unit
}
```



如果表也被 update 或 delete 修改,则会抛出一个 `TableException`

### 收回流表接收器(RetractStreamTableSink)

定义一个外部 `TableSink`,用于发出具有插入、更新和删除更改的流表。

接口如下:



```
RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {

  public TypeInformation<T> getRecordType();

  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
```





```
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
```



该表将被转换为一个累加和收回消息流,这些消息被编码为 Java `Tuple2`。第一个字段是一个布尔标志,用于指示消息类型(`true` 表示插入,`false` 表示删除)。第二个字段保存所请求类型 `T` 的记录。

### 维护流表接收器(UpsertStreamTableSink)

定义一个外部 `TableSink`,用于发出具有插入、更新和删除更改的流表。

接口如下:



```
UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {

  public void setKeyFields(String[] keys);

  public void setIsAppendOnly(boolean isAppendOnly);

  public TypeInformation<T> getRecordType();

  public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
```





```
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def setKeyFields(keys: Array[String]): Unit

  def setIsAppendOnly(isAppendOnly: Boolean): Unit

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
```



该表必须具有唯一的 key 字段(原子或复合)或仅附加。如果表没有唯一 key 并且不是仅附加,则抛出 `TableException`。表的唯一键由 `UpsertStreamTableSink#setKeyFields()` 方法配置。

该表将被转换为 upsert 和 delete 消息流,这些消息被编码为 Java `Tuple2`。第一个字段是一个布尔标志,用于指示消息类型。第二个字段保存所请求类型 `T` 的记录。

具有 true 布尔字段的消息是已配置 key 的 upsert 消息。带有错误标志的消息是已配置 key 的删除消息。如果表是仅附加的,则所有消息都将具有 true 标志,并且必须解释为插入。

## 定义表工厂(TableFactory)

`TableFactory` 允许从基于字符串的属性创建不同的表相关实例。调用所有可用工厂以匹配给定的属性集和相应的工厂类。

工厂利用 Java 的[服务提供者接口(Service Provider Interfaces, SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html)进行发现。这意味着每个依赖项和 JAR 文件都应在 `META_INF/services` 资源目录中包含一个文件 `org.apache.flink.table.factories.TableFactory`,该文件列出了它提供的所有可用表工厂。

每个表工厂都需要实现以下接口:



```
package org.apache.flink.table.factories;

interface TableFactory {

  Map<String, String> requiredContext();

  List<String> supportedProperties();
}
```





```
package org.apache.flink.table.factories

trait TableFactory {

  def requiredContext(): util.Map[String, String]

  def supportedProperties(): util.List[String]
}
```



*   `requiredContext()`: 指定已为此工厂实现的上下文。如果满足指定的属性和值集,框架保证仅匹配此工厂。典型属性可能是 `connector.type`, `format.type``update-mode`。诸如 `connector.property-version``format.property-version` 之类的属性 keys 保留用于将来的向后兼容性情况。
*   `supportedProperties`: 此工厂可以处理的属性 keys 列表。此方法将用于验证。如果传递了该工厂无法处理的属性,则会抛出异常。该列表不得包含上下文指定的 keys。

为了创建特定实例,工厂类可以实现 `org.apache.flink.table.factories` 中提供的一个或多个接口:

*   `BatchTableSourceFactory`: 创建批处理表源。
*   `BatchTableSinkFactory`: 创建批处理表接收器。
*   `StreamTableSoureFactory`: 创建流表源。
*   `StreamTableSinkFactory`: 创建流表接收器。
*   `DeserializationSchemaFactory`: 创建反序列化架构格式。
*   `SerializationSchemaFactory`: 创建序列化架构格式。

工厂的发现经历了多个阶段:

*   发现所有可用的工厂。
*   按工厂类过滤(例如,`StreamTableSourceFactory`)。
*   通过匹配上下文过滤。
*   按支持的属性过滤。
*   验证一个工厂是否匹配,否则抛出 `AmbiguousTableFactoryException``NoMatchingTableFactoryException`

以下示例显示如何为参数化提供附加 `connector.debug` 属性标志的自定义流式源。



```
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {

  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("update-mode", "append");
    context.put("connector.type", "my-system");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    List<String> list = new ArrayList<>();
    list.add("connector.debug");
    return list;
  }

  @Override
  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));

    # additional validation of the passed properties can also happen here

    return new MySystemAppendTableSource(isDebug);
  }
}
```





```
import java.util
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row

class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {

  override def requiredContext(): util.Map[String, String] = {
    val context = new util.HashMap[String, String]()
    context.put("update-mode", "append")
    context.put("connector.type", "my-system")
    context
  }

  override def supportedProperties(): util.List[String] = {
    val properties = new util.ArrayList[String]()
    properties.add("connector.debug")
    properties
  }

  override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
    val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))

    # additional validation of the passed properties can also happen here

    new MySystemAppendTableSource(isDebug)
  }
}
```



### 在 SQL 客户端中使用 TableFactory

在 SQL 客户端环境文件中,先前显示的工厂可以声明为:



```
tables:
 - name: MySystemTable
   type: source
   update-mode: append
   connector:
     type: my-system
     debug: true
```



YAML 文件被转换为扁平的(flattened)字符串属性,并使用描述与外部系统的连接的属性调用表工厂:



```
update-mode=append
connector.type=my-system
connector.debug=true
```



注意,属性如 `tables.#.name``tables.#.type` 是 SQL 客户端的特定类型,不传递给任何工厂。`type` 属性根据执行环境决定是否需要发现 `BatchTableSourceFactory`/`StreamTableSourceFactory` (用于`source`)、`BatchTableSinkFactory`/`StreamTableSinkFactory`(用于 `sink`)或两者(用于 `both`)。

### 在 Table 和 SQL API 中使用 TableFactory

对于具有解释性 Scaladoc/Javadoc 的类型安全的编程方法,Table 和 SQL API 在 `org.apache.flink.table.descriptors` 中提供了转换为基于字符串的属性的描述符。有关源(sources),接收器(sinks)和格式(formats)的信息,请参阅[内置描述符(built-in descriptors)](connect.html)作为参考。

在我们的示例中,`MySystem` 的连接器可以扩展 `ConnectorDescriptor`,如下所示:



```
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.HashMap;
import java.util.Map;

/**
  * Connector to MySystem with debug mode.
  */
public class MySystemConnector extends ConnectorDescriptor {

  public final boolean isDebug;

  public MySystemConnector(boolean isDebug) {
    super("my-system", 1, false);
    this.isDebug = isDebug;
  }

  @Override
  protected Map<String, String> toConnectorProperties() {
    Map<String, String> properties = new HashMap<>();
    properties.put("connector.debug", Boolean.toString(isDebug));
    return properties;
  }
}
```





```
import org.apache.flink.table.descriptors.ConnectorDescriptor
import java.util.HashMap
import java.util.Map

/**
  * Connector to MySystem with debug mode.
  */
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) {

  override protected def toConnectorProperties(): Map[String, String] = {
    val properties = new HashMap[String, String]
    properties.put("connector.debug", isDebug.toString)
    properties
  }
}
```



然后可以在 API 中使用描述符,如下所示:



```
StreamTableEnvironment tableEnv = // ...

tableEnv
  .connect(new MySystemConnector(true))
  .inAppendMode()
  .registerTableSource("MySystemTable");
```





```
val tableEnv: StreamTableEnvironment = // ...
tableEnv
  .connect(new MySystemConnector(isDebug = true))
  .inAppendMode()
  .registerTableSource("MySystemTable")
```