# 用户定义的 Sources 和 Sinks `TableSource` 提供对存储在外部系统(数据库、键值存储、消息队列)或文件中的数据的访问。[TableSource 在 TableEnvironment 中注册后](common.html#register-a-tablesource)可以通过[Table API](tableApi.html)或[SQL](sql.html)查询访问它。 `TableSink`[发送 Table](common.html#emit-a-table)到外部存储系统,如数据库、键值存储、消息队列或文件系统(以不同的编码方式,如 CSV、Parquet 或 ORC)。 `TableFactory` 允许将与外部系统的连接声明与实际实现分离。表工厂从规范化的、基于字符串的属性创建表源(sources)和表接收器(sinks)的已配置实例。可以使用`描述符(Descriptor)`以编程方式生成属性,也可以通过[SQL Client](sqlClient.html)的YAML配置文件生成属性。 查看[常见概念和API](common.html)页面,了解如何[注册TableSource](common.html#register-a-tablesource)以及如何[通过 TableSink 发出 Table](common.html#emit-a-table)的详细信息。有关如何使用工厂的示例,请参阅[内置源,接收器和格式](connect.html)页面。 ## 定义一个表源(TableSource) `TableSource` 是一个通用接口,允许 Table API 和 SQL 查询访问存储在外部系统中的数据。它提供了表的模式以及映射到具有表模式的行的记录。根据是否在流式或批量查询中使用 `TableSource`,记录将作为 `DataSet` 或 `DataStream` 生成。 如果在流查询中使用 `TableSource`,它必须实现 `StreamTableSource` 接口;如果在批处理查询中使用它,它必须实现 `BatchTableSource` 接口。`TableSource` 还可以实现这两个接口,并可用于流查询和批处理查询。 `StreamTableSource` 和 `BatchTableSource` 扩展了定义了以下方法的基本接口 `TableSource`: ``` TableSource { public TableSchema getTableSchema(); public TypeInformation getReturnType(); public String explainSource(); } ``` ``` TableSource[T] { def getTableSchema: TableSchema def getReturnType: TypeInformation[T] def explainSource: String } ``` * `getTableSchema()`: 返回表的模式,即表中字段的名称和类型。字段类型是使用 Flink 的 `类型信息(TypeInformation)`(参见[Table API types](tableApi.html#data-types)和[SQL types](sql.html#data-types))定义的。 * `getReturnType()`: 返回 `DataStream` (`StreamTableSource`) 或 `DataSet` (`BatchTableSource`) 的物理类型和由 `TableSource` 生成的记录。 * `explainSource()`: 返回描述 `TableSource` 的字符串。此方法是可选的,仅用于显示目的。 `TableSource` 接口将逻辑表模式与返回的 `DataStream` 或 `DataSet` 的物理类型分开。因此,表模式(`getTableSchema()`)的所有字段必须映射到物理返回类型(`getReturnType()`)的对应类型的字段。默认情况下,这个映射是基于字段名完成的。例如,定义具有两个字段`[name: String, size: Integer]`的表模式的 `TableSource` 需要一个 `TypeInformation`,其中至少有两个字段名为 `name` 和 `size` ,类型为 `String` 和 `Integer`。 这可能是一个 `PojoTypeInfo` 或 `RowTypeInfo`,它有两个名为 `name` 和 `size` 的字段匹配类型。 然而,有些类型,如 Tuple 或 CaseClass 类型,确实支持自定义字段名。如果 `TableSource` 返回具有固定字段名的类型的 `DataStream` 或 `DataSet`,它可以实现 `DefinedFieldMapping` 接口,将字段名从表模式映射到物理返回类型的字段名。 ### 定义一个 BatchTableSource `BatchTableSource` 接口扩展了 `TableSource` 接口,并定义了一个额外的方法: ``` BatchTableSource implements TableSource { public DataSet getDataSet(ExecutionEnvironment execEnv); } ``` ``` BatchTableSource[T] extends TableSource[T] { def getDataSet(execEnv: ExecutionEnvironment): DataSet[T] } ``` * `getDataSet(execEnv)`: 返回包含表数据的 `DataSet`。`DataSet` 的类型必须与 `TableSource.getReturnType()` 方法定义的返回类型相同。`DataSet` 可以通过使用 DataSet API 的常规[数据源](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources)创建。通常,`BatchTableSource` 是通过包装 `InputFormat` 或 [batch connector](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/connectors.html)来实现的。 ### 定义一个 StreamTableSource `StreamTableSource` 接口扩展了 `TableSource` 接口,并定义了一个额外的方法: ``` StreamTableSource implements TableSource { public DataStream getDataStream(StreamExecutionEnvironment execEnv); } ``` ``` StreamTableSource[T] extends TableSource[T] { def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] } ``` * `getDataStream(execEnv)`: 返回带有表数据的`DataStream`。`DataStream` 的类型必须与 `TableSource.getReturnType()` 方法定义的返回类型相同。`DataStream` 可以通过使用 DataStream API 的常规[数据源(data source)](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/datastream_api.html#data-sources)创建。通常,`StreamTableSource` 是通过包装 `SourceFunction` 或[流连接器(stream connector)](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/)来实现的。 ### 使用时间属性定义表源(TableSource) 流式[表API](tableApi.html#group-windows)和[SQL](sql.html#group-windows)查询的基于时间的操作(如窗口聚合或连接)需要显式指定[时间属性](streaming/time_attributes.html)。 `TableSource` 在其表模式中将 time 属性定义为类型为 `Types.SQL_TIMESTAMP` 的字段。与模式中的所有常规字段相比,时间属性不能与表源返回类型中的物理字段匹配。相反,`TableSource`通过实现某个接口来定义时间属性。 #### 定义处理时间属性(Processing Time Attribute) [处理时间属性](streaming/time_attributes.html#processing-time)通常用于流查询。处理时间属性返回访问它的操作符的当前壁钟时间(wall-clock time)。`TableSource` 通过实现 `DefinedProctimeAttribute` 接口来定义处理时间属性。接口如下: ``` DefinedProctimeAttribute { public String getProctimeAttribute(); } ``` ``` DefinedProctimeAttribute { def getProctimeAttribute: String } ``` * `getProctimeAttribute()`: 返回处理时间属性的名称。必须在表模式中定义指定的属性类型 `Types.SQL_TIMESTAMP`,并且可以在基于时间的操作中使用。`DefinedProctimeAttribute` 表源可以通过返回 `null` 来定义没有处理时间属性。 注意 `StreamTableSource` 和 `BatchTableSource` 都可以实现 `DefinedProctimeAttribute` 并定义处理时间属性。对于 `BatchTableSource`,处理时间字段在表扫描期间使用当前时间戳初始化。 #### 定义 Rowtime 属性(Rowtime Attribute) [Rowtime属性](streaming/time_attributes.html#event-time)是 `TIMESTAMP` 类型的属性,在流和批处理查询中以统一的方式处理。 通过指定,可以将类型为 `SQL_TIMESTAMP` 的表模式字段声明为 rowtime 属性 * 字段名, * `TimestampExtractor` 计算属性的实际值(通常来自一个或多个其他字段),以及 * 指定如何为 rowtim e属性生成水印的 `WatermarkStrategy`。 `TableSource` 通过实现 `DefinedRowtimeAttributes` 接口定义了一个 rowtime 属性。接口如下: ``` DefinedRowtimeAttribute { public List getRowtimeAttributeDescriptors(); } ``` ``` DefinedRowtimeAttributes { def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] } ``` * `getRowtimeAttributeDescriptors()`: 返回一个 `RowtimeAttributeDescriptor` 列表。`RowtimeAttributeDescriptor` 描述了一个具有以下属性的 rowtime 属性: * `attributeName`: 表模式中 rowtime 属性的名称。必须使用类型 `Types.SQL_TIMESTAMP` 定义字段。 * `timestampExtractor`: 时间戳提取器(timestamp extractor)从具有返回类型的记录中提取时间戳。例如,它可以将长字段转换为时间戳,或者解析字符串编码的时间戳。Flink 附带一组内置的 `TimestampExtractor` 实现,用于常见的用例。还可以提供自定义实现。 * `watermarkStrategy`: 水印策略(watermark strategy)定义了如何为 rowtime 属性生成水印。Flink 为常见用例提供了一组内置的 `WatermarkStrategy` 实现。还可以提供自定义实现。 注意,虽然 `getRowtimeAttributeDescriptors()` 方法返回一个描述符列表,但目前只支持一个 rowtime 属性。我们计划在将来删除这个限制,并支持具有多个 rowtime 属性的表。 请注意,`StreamTableSource` 和 `BatchTableSource` 都可以实现 `DefinedRowtimeAttributes` 并定义 rowtime 属性。在这两种情况下,行时间字段都是使用 `TimestampExtractor` 提取的。因此,实现 `StreamTableSource` 和 `BatchTableSource` 并定义 rowtime 属性的 `TableSource` 为流处理和批处理查询提供了完全相同的数据。 ##### 提供时间戳提取器(Timestamp Extractors) Flink 为常用用例提供了 `TimestampExtractor` 实现。 以下 `TimestampExtractor` 实现目前可用: * `ExistingField(fieldName)`: 从现有的 `LONG`, `SQL_TIMESTAMP` 或格式化的 `STRING` 字段中提取 rowtime 属性的值。这样一个字符串的一个例子是 ‘2018-05-28 12:34:56.000’。 * `StreamRecordTimestamp()`: 从 `DataStream` `StreamRecord` 的时间戳中提取 rowtime 属性的值。注意,此 `TimestampExtractor` 不适用于批处理表源。 自定义的 `TimestampExtractor` 可以通过实现相应的接口来定义。 ##### 提供水印策略(Watermark Strategies) Flink 为常用用例提供了`水印策略(WatermarkStrategy)`实现。 目前提供以下 `WatermarkStrategy` 实现: * `AscendingTimestamps`: 用于提升时间戳的水印策略。带有无序时间戳的记录将被视为迟到。 * `BoundedOutOfOrderTimestamps(delay)`: 时间戳的水印策略,其最多在指定的延迟之外是无序的。 * `PreserveWatermarks()`: 应该从底层的 `DataStream` 中保留指示水印的策略。 可以通过实现相应的接口来定义自定义的 `WatermarkStrategy`。 ### 使用投影下推定义表源 Defining a TableSource with Projection Push-Down `TableSource` 通过实现 `ProjectableTableSource` 接口支持投影下推。该接口定义了一个单一的方法: ``` ProjectableTableSource { public TableSource projectFields(int[] fields); } ``` ``` ProjectableTableSource[T] { def projectFields(fields: Array[Int]): TableSource[T] } ``` * `projectFields(fields)`: 返回具有调整后的物理返回类型的 `TableSource` 的 _副本(copy)_。`fields` 参数提供了必须由 `TableSource` 提供的字段的索引。索引与物理返回类型的 `TypeInformation` 有关,而 _不是_ 与逻辑表模式相关。复制的 `TableSource` 必须调整其返回类型和返回的 `DataStream` 或 `DataSet`。复制的 `TableSource` 的 `TableSchema` 不能更改,即它必须与原来的 `TableSource` 相同。如果 `TableSource` 实现了 `DefinedFieldMapping` 接口,则必须将字段映射调整为新的返回类型。 `ProjectableTableSource` 为项目平面字段添加了支持。如果 `TableSource` 定义了一个带有嵌套模式的表,它可以实现 `NestedFieldsProjectableTableSource` 来将投影扩展到嵌套字段。`NestedFieldsProjectableTableSource` 的定义如下: ``` NestedFieldsProjectableTableSource { public TableSource projectNestedFields(int[] fields, String[][] nestedFields); } ``` ``` NestedFieldsProjectableTableSource[T] { def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T] } ``` * `projectNestedField(fields, nestedFields)`: 返回 `TableSource` 的具有调整后的物理返回类型的一个 _副本(copy)_。可以删除或重新排序物理返回类型的字段,但不能更改它们的类型。该方法的契约(contract)本质上与 `ProjectableTableSource.projectFields()` 方法相同。此外,`nestedFields` 参数包含 `fields` 列表中的每个字段索引,这是查询访问的所有嵌套字段的路径列表。所有其他嵌套字段都不需要在 `TableSource` 生成的记录中读取、解析和设置。**最重要的是** 不能更改投影字段的类型,但是可以将未使用的字段设置为null或默认值。 ### 使用下推过滤器定义表源(TableSource) `FilterableTableSource` 接口增加了对 `TableSource` 下推过滤器的支持。扩展这个接口的 `TableSource` 能够过滤记录,这样返回的 `DataStream` 或 `DataSet` 返回的记录就会更少。 接口如下: ``` FilterableTableSource { public TableSource applyPredicate(List predicates); public boolean isFilterPushedDown(); } ``` ``` FilterableTableSource[T] { def applyPredicate(predicates: java.util.List[Expression]): TableSource[T] def isFilterPushedDown: Boolean } ``` * `applyPredicate(predicates)`: 返回带有添加谓词的 `TableSource` 的 _副本(copy)_。`predicates` 参数是一个可变的连接谓词列表,它们“提供(offered)”给 `TableSource`。`TableSource` 接受通过从列表中删除谓词来计算谓词的值。列表中剩下的谓词将由后续过滤器操作符计算。 * `isFilterPushedDown()`: 如果之前调用了 `applyPredicate()` 方法,则返回 true。因此,对于从 `applyPredicate()` 调用返回的所有 `TableSource` 实例,`isFilterPushedDown()` 必须返回 true。 ## 定义一个 TableSink `TableSink` 指定如何将 `Table` 发送到外部系统或位置。该接口是通用的,因此它可以支持不同的存储位置和格式。对于批处理表和流表,有不同的表接收器(table sinks)。 通用接口如下: ``` TableSink { public TypeInformation getOutputType(); public String[] getFieldNames(); public TypeInformation[] getFieldTypes(); public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes); } ``` ``` TableSink[T] { def getOutputType: TypeInformation def getFieldNames: Array[String] def getFieldTypes: Array[TypeInformation] def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T] } ``` 调用 `TableSink#configure` 方法将表的模式(字段名和类型)传递给 `TableSink`。该方法必须返回 TableSink 的一个新实例,该实例被配置为发出所提供的表模式。 ### 批处理表接收器(BatchTableSink) 定义外部 `TableSink` 以发出批处理表。 接口如下: ``` BatchTableSink implements TableSink { public void emitDataSet(DataSet dataSet); } ``` ``` BatchTableSink[T] extends TableSink[T] { def emitDataSet(dataSet: DataSet[T]): Unit } ``` ### 附加流表接收器(AppendStreamTableSink) 定义一个外部 `TableSink` 来发出只包含插入更改的流表。 接口如下: ``` AppendStreamTableSink implements TableSink { public void emitDataStream(DataStream dataStream); } ``` ``` AppendStreamTableSink[T] extends TableSink[T] { def emitDataStream(dataStream: DataStream): Unit } ``` 如果表也被 update 或 delete 修改,则会抛出一个 `TableException`。 ### 收回流表接收器(RetractStreamTableSink) 定义一个外部 `TableSink`,用于发出具有插入、更新和删除更改的流表。 接口如下: ``` RetractStreamTableSink implements TableSink> { public TypeInformation getRecordType(); public void emitDataStream(DataStream> dataStream); } ``` ``` RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def getRecordType: TypeInformation[T] def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit } ``` 该表将被转换为一个累加和收回消息流,这些消息被编码为 Java `Tuple2`。第一个字段是一个布尔标志,用于指示消息类型(`true` 表示插入,`false` 表示删除)。第二个字段保存所请求类型 `T` 的记录。 ### 维护流表接收器(UpsertStreamTableSink) 定义一个外部 `TableSink`,用于发出具有插入、更新和删除更改的流表。 接口如下: ``` UpsertStreamTableSink implements TableSink> { public void setKeyFields(String[] keys); public void setIsAppendOnly(boolean isAppendOnly); public TypeInformation getRecordType(); public void emitDataStream(DataStream> dataStream); } ``` ``` UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def setKeyFields(keys: Array[String]): Unit def setIsAppendOnly(isAppendOnly: Boolean): Unit def getRecordType: TypeInformation[T] def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit } ``` 该表必须具有唯一的 key 字段(原子或复合)或仅附加。如果表没有唯一 key 并且不是仅附加,则抛出 `TableException`。表的唯一键由 `UpsertStreamTableSink#setKeyFields()` 方法配置。 该表将被转换为 upsert 和 delete 消息流,这些消息被编码为 Java `Tuple2`。第一个字段是一个布尔标志,用于指示消息类型。第二个字段保存所请求类型 `T` 的记录。 具有 true 布尔字段的消息是已配置 key 的 upsert 消息。带有错误标志的消息是已配置 key 的删除消息。如果表是仅附加的,则所有消息都将具有 true 标志,并且必须解释为插入。 ## 定义表工厂(TableFactory) `TableFactory` 允许从基于字符串的属性创建不同的表相关实例。调用所有可用工厂以匹配给定的属性集和相应的工厂类。 工厂利用 Java 的[服务提供者接口(Service Provider Interfaces, SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html)进行发现。这意味着每个依赖项和 JAR 文件都应在 `META_INF/services` 资源目录中包含一个文件 `org.apache.flink.table.factories.TableFactory`,该文件列出了它提供的所有可用表工厂。 每个表工厂都需要实现以下接口: ``` package org.apache.flink.table.factories; interface TableFactory { Map requiredContext(); List supportedProperties(); } ``` ``` package org.apache.flink.table.factories trait TableFactory { def requiredContext(): util.Map[String, String] def supportedProperties(): util.List[String] } ``` * `requiredContext()`: 指定已为此工厂实现的上下文。如果满足指定的属性和值集,框架保证仅匹配此工厂。典型属性可能是 `connector.type`, `format.type` 或 `update-mode`。诸如 `connector.property-version` 和 `format.property-version` 之类的属性 keys 保留用于将来的向后兼容性情况。 * `supportedProperties`: 此工厂可以处理的属性 keys 列表。此方法将用于验证。如果传递了该工厂无法处理的属性,则会抛出异常。该列表不得包含上下文指定的 keys。 为了创建特定实例,工厂类可以实现 `org.apache.flink.table.factories` 中提供的一个或多个接口: * `BatchTableSourceFactory`: 创建批处理表源。 * `BatchTableSinkFactory`: 创建批处理表接收器。 * `StreamTableSoureFactory`: 创建流表源。 * `StreamTableSinkFactory`: 创建流表接收器。 * `DeserializationSchemaFactory`: 创建反序列化架构格式。 * `SerializationSchemaFactory`: 创建序列化架构格式。 工厂的发现经历了多个阶段: * 发现所有可用的工厂。 * 按工厂类过滤(例如,`StreamTableSourceFactory`)。 * 通过匹配上下文过滤。 * 按支持的属性过滤。 * 验证一个工厂是否匹配,否则抛出 `AmbiguousTableFactoryException` 或 `NoMatchingTableFactoryException`。 以下示例显示如何为参数化提供附加 `connector.debug` 属性标志的自定义流式源。 ``` import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; class MySystemTableSourceFactory implements StreamTableSourceFactory { @Override public Map requiredContext() { Map context = new HashMap<>(); context.put("update-mode", "append"); context.put("connector.type", "my-system"); return context; } @Override public List supportedProperties() { List list = new ArrayList<>(); list.add("connector.debug"); return list; } @Override public StreamTableSource createStreamTableSource(Map properties) { boolean isDebug = Boolean.valueOf(properties.get("connector.debug")); # additional validation of the passed properties can also happen here return new MySystemAppendTableSource(isDebug); } } ``` ``` import java.util import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.types.Row class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() context.put("update-mode", "append") context.put("connector.type", "my-system") context } override def supportedProperties(): util.List[String] = { val properties = new util.ArrayList[String]() properties.add("connector.debug") properties } override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = { val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug")) # additional validation of the passed properties can also happen here new MySystemAppendTableSource(isDebug) } } ``` ### 在 SQL 客户端中使用 TableFactory 在 SQL 客户端环境文件中,先前显示的工厂可以声明为: ``` tables: - name: MySystemTable type: source update-mode: append connector: type: my-system debug: true ``` YAML 文件被转换为扁平的(flattened)字符串属性,并使用描述与外部系统的连接的属性调用表工厂: ``` update-mode=append connector.type=my-system connector.debug=true ``` 注意,属性如 `tables.#.name` 或 `tables.#.type` 是 SQL 客户端的特定类型,不传递给任何工厂。`type` 属性根据执行环境决定是否需要发现 `BatchTableSourceFactory`/`StreamTableSourceFactory` (用于`source`)、`BatchTableSinkFactory`/`StreamTableSinkFactory`(用于 `sink`)或两者(用于 `both`)。 ### 在 Table 和 SQL API 中使用 TableFactory 对于具有解释性 Scaladoc/Javadoc 的类型安全的编程方法,Table 和 SQL API 在 `org.apache.flink.table.descriptors` 中提供了转换为基于字符串的属性的描述符。有关源(sources),接收器(sinks)和格式(formats)的信息,请参阅[内置描述符(built-in descriptors)](connect.html)作为参考。 在我们的示例中,`MySystem` 的连接器可以扩展 `ConnectorDescriptor`,如下所示: ``` import org.apache.flink.table.descriptors.ConnectorDescriptor; import java.util.HashMap; import java.util.Map; /** * Connector to MySystem with debug mode. */ public class MySystemConnector extends ConnectorDescriptor { public final boolean isDebug; public MySystemConnector(boolean isDebug) { super("my-system", 1, false); this.isDebug = isDebug; } @Override protected Map toConnectorProperties() { Map properties = new HashMap<>(); properties.put("connector.debug", Boolean.toString(isDebug)); return properties; } } ``` ``` import org.apache.flink.table.descriptors.ConnectorDescriptor import java.util.HashMap import java.util.Map /** * Connector to MySystem with debug mode. */ class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) { override protected def toConnectorProperties(): Map[String, String] = { val properties = new HashMap[String, String] properties.put("connector.debug", isDebug.toString) properties } } ``` 然后可以在 API 中使用描述符,如下所示: ``` StreamTableEnvironment tableEnv = // ... tableEnv .connect(new MySystemConnector(true)) .inAppendMode() .registerTableSource("MySystemTable"); ``` ``` val tableEnv: StreamTableEnvironment = // ... tableEnv .connect(new MySystemConnector(isDebug = true)) .inAppendMode() .registerTableSource("MySystemTable") ```