From 3d35c7d6ee13c681f44d83966295457d53fdb24b Mon Sep 17 00:00:00 2001 From: Bo Ding Date: Tue, 31 May 2022 09:50:52 +0800 Subject: [PATCH] docs: enhance kafka connector tutorial (#13273) --- docs-cn/20-third-party/11-kafka.md | 101 ++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 16 deletions(-) diff --git a/docs-cn/20-third-party/11-kafka.md b/docs-cn/20-third-party/11-kafka.md index 0de5b43a39..e674784054 100644 --- a/docs-cn/20-third-party/11-kafka.md +++ b/docs-cn/20-third-party/11-kafka.md @@ -7,7 +7,7 @@ TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDeng ## 什么是 Kafka Connect? -Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。 +Kafka Connect 是 [Apache Kafka](https://kafka.apache.org/) 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。 ![TDengine Database Kafka Connector -- Kafka Connect structure](kafka/Kafka_Connect.webp) @@ -17,7 +17,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送 ## 什么是 Confluent? -Confluent 在 Kafka 的基础上增加很多扩展功能。包括: +[Confluent](https://www.confluent.io/) 在 Kafka 的基础上增加很多扩展功能。包括: 1. Schema Registry 2. REST 代理 @@ -81,10 +81,10 @@ Development: false git clone https://github.com:taosdata/kafka-connect-tdengine.git cd kafka-connect-tdengine mvn clean package -unzip -d $CONFLUENT_HOME/share/confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0.zip +unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip ``` -以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件 `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties` 中。默认的路径为 `$CONFLUENT_HOME/share/confluent-hub-components/`。 +以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件 `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties` 中。上面的示例中使用了内置的插件路径: `$CONFLUENT_HOME/share/java/`。 ### 用 confluent-hub 安装 @@ -98,7 +98,7 @@ confluent local services start ``` :::note -一定要先安装插件再启动 Confluent, 否则会出现找不到类的错误。Kafka Connect 的日志(默认路径: /tmp/confluent.xxxx/connect/logs/connect.log)中会输出成功安装的插件,据此可判断插件是否安装成功。 +一定要先安装插件再启动 Confluent, 否则加载插件会失败。 ::: :::tip @@ -125,6 +125,61 @@ Control Center is [UP] 清空数据可执行 `rm -rf /tmp/confluent.106668`。 ::: +### 验证各个组件是否启动成功 + +输入命令: + +``` +confluent local services status +``` + +如果各组件都启动成功,会得到如下输出: + +``` +Connect is [UP] +Control Center is [UP] +Kafka is [UP] +Kafka REST is [UP] +ksqlDB Server is [UP] +Schema Registry is [UP] +ZooKeeper is [UP] +``` + +### 验证插件是否安装成功 + +在 Kafka Connect 组件完全启动后,可用以下命令列出成功加载的插件: + +``` +confluent local services connect plugin list +``` + +如果成功安装,会输出如下: + +```txt {4,9} +Available Connect Plugins: +[ + { + "class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector", + "type": "sink", + "version": "1.0.0" + }, + { + "class": "com.taosdata.kafka.connect.source.TDengineSourceConnector", + "type": "source", + "version": "1.0.0" + }, +...... +``` + +如果插件安装失败,请检查 Kafka Connect 的启动日志是否有异常信息,用以下命令输出日志路径: +``` +echo `cat /tmp/confluent.current`/connect/connect.stdout +``` +该命令的输出类似: `/tmp/confluent.104086/connect/connect.stdout`。 + +与日志文件 `connect.stdout` 同一目录,还有一个文件名为: `connect.properties`。在这个文件的末尾,可以看到最终生效的 `plugin.path`, 它是一系列用逗号分割的路径。如果插件安装失败,很可能是因为实际的安装路径不包含在 `plugin.path` 中。 + + ## TDengine Sink Connector 的使用 TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。 @@ -144,7 +199,7 @@ vi sink-demo.properties sink-demo.properties 内容如下: ```ini title="sink-demo.properties" -name=tdengine-sink-demo +name=TDengineSinkConnector connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector tasks.max=1 topics=meters @@ -153,6 +208,7 @@ connection.user=root connection.password=taosdata connection.database=power db.schemaless=line +data.precision=ns key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ``` @@ -179,6 +235,7 @@ confluent local services connect connector load TDengineSinkConnector --config . "connection.url": "jdbc:TAOS://127.0.0.1:6030", "connection.user": "root", "connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector", + "data.precision": "ns", "db.schemaless": "line", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "tasks.max": "1", @@ -223,10 +280,10 @@ Database changed. taos> select * from meters; ts | current | voltage | phase | groupid | location | =============================================================================================================================================================== - 2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles | - 2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles | - 2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles | - 2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles | + 2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles | + 2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles | + 2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles | + 2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles | Query OK, 4 row(s) in set (0.004208s) ``` @@ -356,21 +413,33 @@ confluent local services connect connector unload TDengineSourceConnector 2. `connection.database.prefix`: 当 connection.database 为 null 时, 目标数据库的前缀。可以包含占位符 '${topic}'。 比如 kafka_${topic}, 对于主题 'orders' 将写入数据库 'kafka_orders'。 默认 null。当为 null 时,目标数据库的名字和主题的名字是一致的。 3. `batch.size`: 分批写入每批记录数。当 Sink Connector 一次接收到的数据大于这个值时将分批写入。 4. `max.retries`: 发生错误时的最大重试次数。默认为 1。 -5. `retry.backoff.ms`: 发送错误时重试的时间间隔。单位毫秒,默认 3000。 -6. `db.schemaless`: 数据格式,必须指定为: line、json、telnet 中的一个。分别代表 InfluxDB 行协议格式、 OpenTSDB JSON 格式、 OpenTSDB Telnet 行协议格式。 +5. `retry.backoff.ms`: 发送错误时重试的时间间隔。单位毫秒,默认为 3000。 +6. `db.schemaless`: 数据格式,可选值为: + 1. line :代表 InfluxDB 行协议格式 + 2. json : 代表 OpenTSDB JSON 格式 + 3. telnet :代表 OpenTSDB Telnet 行协议格式 +7. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为: + 1. ms : 表示毫秒 + 2. us : 表示微秒 + 3. ns : 表示纳秒。默认为纳秒。 ### TDengine Source Connector 特有的配置 1. `connection.database`: 源数据库名称,无缺省值。 2. `topic.prefix`: 数据导入 kafka 后 topic 名称前缀。 使用 `topic.prefix` + `connection.database` 名称作为完整 topic 名。默认为空字符串 ""。 -3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认 "1970-01-01 00:00:00"。 -4. `poll.interval.ms`: 拉取数据间隔,单位为 ms。默认 1000。 +3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认为 "1970-01-01 00:00:00"。 +4. `poll.interval.ms`: 拉取数据间隔,单位为 ms。默认为 1000。 5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。 -6. `out.format`: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认 line。 +6. `out.format`: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认为 line。 + +## 其他说明 + +1. 插件的安装位置可以自定义,请参考官方文档:https://docs.confluent.io/home/connect/self-managed/install.html#install-connector-manually。 +2. 本教程的示例程序使用了 Confluent 平台,但是 TDengine Kafka Connector 本身同样适用于独立安装的 Kafka, 且配置方法相同。关于如何在独立安装的 Kafka 环境使用 Kafka Connect 插件, 请参考官方文档: https://kafka.apache.org/documentation/#connect。 ## 问题反馈 -https://github.com/taosdata/kafka-connect-tdengine/issues +无论遇到任何问题,都欢迎在本项目的 Github 仓库反馈: https://github.com/taosdata/kafka-connect-tdengine/issues。 ## 参考 -- GitLab