未验证 提交 53541f7e 编写于 作者: B Bo Ding 提交者: GitHub

docs: enhance kafka connector tutorial (#13280)

* docs: enhance kafka connector tutorial

* docs: ehance kafka connector tutorial

* docs: typo
上级 2fc2381a
......@@ -7,7 +7,7 @@ TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDeng
## 什么是 Kafka Connect?
Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。
Kafka Connect 是 [Apache Kafka](https://kafka.apache.org/) 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。
![TDengine Database Kafka Connector -- Kafka Connect structure](kafka/Kafka_Connect.webp)
......@@ -17,7 +17,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
## 什么是 Confluent?
Confluent 在 Kafka 的基础上增加很多扩展功能。包括:
[Confluent](https://www.confluent.io/) 在 Kafka 的基础上增加很多扩展功能。包括:
1. Schema Registry
2. REST 代理
......@@ -81,10 +81,10 @@ Development: false
git clone https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0.zip
unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
```
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件 `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties` 中。默认的路径为 `$CONFLUENT_HOME/share/confluent-hub-components/`
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。上面的示例中使用了内置的插件安装路径: `$CONFLUENT_HOME/share/java/`
### 用 confluent-hub 安装
......@@ -98,7 +98,7 @@ confluent local services start
```
:::note
一定要先安装插件再启动 Confluent, 否则会出现找不到类的错误。Kafka Connect 的日志(默认路径: /tmp/confluent.xxxx/connect/logs/connect.log)中会输出成功安装的插件,据此可判断插件是否安装成功
一定要先安装插件再启动 Confluent, 否则加载插件会失败
:::
:::tip
......@@ -125,6 +125,61 @@ Control Center is [UP]
清空数据可执行 `rm -rf /tmp/confluent.106668`
:::
### 验证各个组件是否启动成功
输入命令:
```
confluent local services status
```
如果各组件都启动成功,会得到如下输出:
```
Connect is [UP]
Control Center is [UP]
Kafka is [UP]
Kafka REST is [UP]
ksqlDB Server is [UP]
Schema Registry is [UP]
ZooKeeper is [UP]
```
### 验证插件是否安装成功
在 Kafka Connect 组件完全启动后,可用以下命令列出成功加载的插件:
```
confluent local services connect plugin list
```
如果成功安装,会输出如下:
```txt {4,9}
Available Connect Plugins:
[
{
"class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"type": "sink",
"version": "1.0.0"
},
{
"class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
"type": "source",
"version": "1.0.0"
},
......
```
如果插件安装失败,请检查 Kafka Connect 的启动日志是否有异常信息,用以下命令输出日志路径:
```
echo `cat /tmp/confluent.current`/connect/connect.stdout
```
该命令的输出类似: `/tmp/confluent.104086/connect/connect.stdout`
与日志文件 `connect.stdout` 同一目录,还有一个文件名为: `connect.properties`。在这个文件的末尾,可以看到最终生效的 `plugin.path`, 它是一系列用逗号分割的路径。如果插件安装失败,很可能是因为实际的安装路径不包含在 `plugin.path` 中。
## TDengine Sink Connector 的使用
TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。
......@@ -144,7 +199,7 @@ vi sink-demo.properties
sink-demo.properties 内容如下:
```ini title="sink-demo.properties"
name=tdengine-sink-demo
name=TDengineSinkConnector
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
......@@ -153,6 +208,7 @@ connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
data.precision=ns
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```
......@@ -179,6 +235,7 @@ confluent local services connect connector load TDengineSinkConnector --config .
"connection.url": "jdbc:TAOS://127.0.0.1:6030",
"connection.user": "root",
"connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"data.precision": "ns",
"db.schemaless": "line",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
......@@ -223,10 +280,10 @@ Database changed.
taos> select * from meters;
ts | current | voltage | phase | groupid | location |
===============================================================================================================================================================
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles |
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles |
Query OK, 4 row(s) in set (0.004208s)
```
......@@ -356,21 +413,33 @@ confluent local services connect connector unload TDengineSourceConnector
2. `connection.database.prefix`: 当 connection.database 为 null 时, 目标数据库的前缀。可以包含占位符 '${topic}'。 比如 kafka_${topic}, 对于主题 'orders' 将写入数据库 'kafka_orders'。 默认 null。当为 null 时,目标数据库的名字和主题的名字是一致的。
3. `batch.size`: 分批写入每批记录数。当 Sink Connector 一次接收到的数据大于这个值时将分批写入。
4. `max.retries`: 发生错误时的最大重试次数。默认为 1。
5. `retry.backoff.ms`: 发送错误时重试的时间间隔。单位毫秒,默认 3000。
6. `db.schemaless`: 数据格式,必须指定为: line、json、telnet 中的一个。分别代表 InfluxDB 行协议格式、 OpenTSDB JSON 格式、 OpenTSDB Telnet 行协议格式。
5. `retry.backoff.ms`: 发送错误时重试的时间间隔。单位毫秒,默认为 3000。
6. `db.schemaless`: 数据格式,可选值为:
1. line :代表 InfluxDB 行协议格式
2. json : 代表 OpenTSDB JSON 格式
3. telnet :代表 OpenTSDB Telnet 行协议格式
7. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
1. ms : 表示毫秒
2. us : 表示微秒
3. ns : 表示纳秒。默认为纳秒。
### TDengine Source Connector 特有的配置
1. `connection.database`: 源数据库名称,无缺省值。
2. `topic.prefix`: 数据导入 kafka 后 topic 名称前缀。 使用 `topic.prefix` + `connection.database` 名称作为完整 topic 名。默认为空字符串 ""。
3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认 "1970-01-01 00:00:00"。
4. `poll.interval.ms`: 拉取数据间隔,单位为 ms。默认 1000。
3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认 "1970-01-01 00:00:00"。
4. `poll.interval.ms`: 拉取数据间隔,单位为 ms。默认 1000。
5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。
6. `out.format`: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认 line。
6. `out.format`: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认为 line。
## 其他说明
1. 插件的安装位置可以自定义,请参考官方文档:https://docs.confluent.io/home/connect/self-managed/install.html#install-connector-manually。
2. 本教程的示例程序使用了 Confluent 平台,但是 TDengine Kafka Connector 本身同样适用于独立安装的 Kafka, 且配置方法相同。关于如何在独立安装的 Kafka 环境使用 Kafka Connect 插件, 请参考官方文档: https://kafka.apache.org/documentation/#connect。
## 问题反馈
https://github.com/taosdata/kafka-connect-tdengine/issues
无论遇到任何问题,都欢迎在本项目的 Github 仓库反馈: https://github.com/taosdata/kafka-connect-tdengine/issues。
## 参考
......
......@@ -12,6 +12,6 @@ Between two major release versions, some beta versions may be delivered for user
For the details please refer to [Install and Uninstall](/operation/pkg-install)。
To see the details of versions, please refer to [Download List](https://www.taosdata.com/all-downloads) and [Release Notes](https://github.com/taosdata/TDengine/releases).
To see the details of versions, please refer to [Download List](https://tdengine.com/all-downloads) and [Release Notes](https://github.com/taosdata/TDengine/releases).
......@@ -7,7 +7,7 @@ TDengine Kafka Connector contains two plugins: TDengine Source Connector and TDe
## What is Kafka Connect?
Kafka Connect is a component of Apache Kafka that enables other systems, such as databases, cloud services, file systems, etc., to connect to Kafka easily. Data can flow from other software to Kafka via Kafka Connect and Kafka to other systems via Kafka Connect. Plugins that read data from other software are called Source Connectors, and plugins that write data to other software are called Sink Connectors. Neither Source Connector nor Sink Connector will directly connect to Kafka Broker, and Source Connector transfers data to Kafka Connect. Sink Connector receives data from Kafka Connect.
Kafka Connect is a component of [Apache Kafka](https://kafka.apache.org/) that enables other systems, such as databases, cloud services, file systems, etc., to connect to Kafka easily. Data can flow from other software to Kafka via Kafka Connect and Kafka to other systems via Kafka Connect. Plugins that read data from other software are called Source Connectors, and plugins that write data to other software are called Sink Connectors. Neither Source Connector nor Sink Connector will directly connect to Kafka Broker, and Source Connector transfers data to Kafka Connect. Sink Connector receives data from Kafka Connect.
![TDengine Database Kafka Connector -- Kafka Connect](kafka/Kafka_Connect.webp)
......@@ -17,7 +17,7 @@ TDengine Source Connector is used to read data from TDengine in real-time and se
## What is Confluent?
Confluent adds many extensions to Kafka. include:
[Confluent](https://www.confluent.io/) adds many extensions to Kafka. include:
1. Schema Registry
2. REST Proxy
......@@ -79,10 +79,10 @@ Development: false
git clone https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0.zip
unzip -d $CONFLUENT_HOME/share/java/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
```
The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to the path where the plugin is installed. The path to install the plugin is in the configuration file `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties`. The default path is `$CONFLUENT_HOME/share/confluent-hub-components/`.
The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to plugin path. We used `$CONFLUENT_HOME/share/java/` above because it's a build in plugin path.
### Install with confluent-hub
......@@ -96,7 +96,7 @@ confluent local services start
```
:::note
Be sure to install the plugin before starting Confluent. Otherwise, there will be a class not found error. The log of Kafka Connect (default path: /tmp/confluent.xxxx/connect/logs/connect.log) will output the successfully installed plugin, which users can use to determine whether the plugin is installed successfully.
Be sure to install the plugin before starting Confluent. Otherwise, Kafka Connect will fail to discover the plugins.
:::
:::tip
......@@ -123,6 +123,59 @@ Control Center is [UP]
To clear data, execute `rm -rf /tmp/confluent.106668`.
:::
### Check Confluent Services Status
Use command bellow to check the status of all service:
```
confluent local services status
```
The expected output is:
```
Connect is [UP]
Control Center is [UP]
Kafka is [UP]
Kafka REST is [UP]
ksqlDB Server is [UP]
Schema Registry is [UP]
ZooKeeper is [UP]
```
### Check Successfully Loaded Plugin
After Kafka Connect was completely started, you can use bellow command to check if our plugins are installed successfully:
```
confluent local services connect plugin list
```
The output should contains `TDengineSinkConnector` and `TDengineSourceConnector` as bellow:
```
Available Connect Plugins:
[
{
"class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"type": "sink",
"version": "1.0.0"
},
{
"class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
"type": "source",
"version": "1.0.0"
},
......
```
If not, please check the log file of Kafka Connect. To view the log file path, please execute:
```
echo `cat /tmp/confluent.current`/connect/connect.stdout
```
It should produce a path like:`/tmp/confluent.104086/connect/connect.stdout`
Besides log file `connect.stdout` there is a file named `connect.properties`. At the end of this file you can see the effective `plugin.path` which is a series of paths joined by comma. If Kafka Connect not found our plugins, it's probably because the installed path is not included in `plugin.path`.
## The use of TDengine Sink Connector
The role of the TDengine Sink Connector is to synchronize the data of the specified topic to TDengine. Users do not need to create databases and super tables in advance. The name of the target database can be specified manually (see the configuration parameter connection.database), or it can be generated according to specific rules (see the configuration parameter connection.database.prefix).
......@@ -142,7 +195,7 @@ vi sink-demo.properties
sink-demo.properties' content is following:
```ini title="sink-demo.properties"
name=tdengine-sink-demo
name=TDengineSinkConnector
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
......@@ -151,6 +204,7 @@ connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
data.precision=ns
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```
......@@ -177,6 +231,7 @@ If the above command is executed successfully, the output is as follows:
"connection.url": "jdbc:TAOS://127.0.0.1:6030",
"connection.user": "root",
"connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"data.precision": "ns",
"db.schemaless": "line",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
......@@ -221,10 +276,10 @@ Database changed.
taos> select * from meters;
ts | current | voltage | phase | groupid | location |
===============================================================================================================================================================
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LoSangeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LoSangeles |
2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LoSangeles |
2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LoSangeles |
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles |
Query OK, 4 row(s) in set (0.004208s)
```
......@@ -356,6 +411,7 @@ The following configuration items apply to TDengine Sink Connector and TDengine
4. `max.retries`: The maximum number of retries when an error occurs. Defaults to 1.
5. `retry.backoff.ms`: The time interval for retry when sending an error. The unit is milliseconds. The default is 3000.
6. `db.schemaless`: Data format, could be one of `line`, `json`, and `telnet`. Represent InfluxDB line protocol format, OpenTSDB JSON format, and OpenTSDB Telnet line protocol format.
7. `data.precision`: The time precision when use InfluxDB line protocol format data, could be one of `ms`, `us` and `ns`. The default is `ns`.
### TDengine Source Connector specific configuration
......@@ -366,7 +422,13 @@ The following configuration items apply to TDengine Sink Connector and TDengine
5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database. Default is 100.
6. `out.format`: The data format. The value could be line or json. The line represents the InfluxDB Line protocol format, and json represents the OpenTSDB JSON format. Default is `line`.
## feedback
## Other notes
1. To install plugin to a customized location, refer to https://docs.confluent.io/home/connect/self-managed/install.html#install-connector-manually.
2. To use Kafka Connect without confluent, refer to https://kafka.apache.org/documentation/#connect.
## Feedback
https://github.com/taosdata/kafka-connect-tdengine/issues
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册