11-kafka.md 15.9 KB
Newer Older
1 2
---
sidebar_label: Kafka
sangshuduo's avatar
sangshuduo 已提交
3
title: TDengine Kafka Connector 使用教程
4 5
---

sangshuduo's avatar
sangshuduo 已提交
6
TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件,就可以将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine, 或将 TDengine 中指定数据库的数据(批量或实时)同步到 Kafka。
7

sangshuduo's avatar
sangshuduo 已提交
8
## 什么是 Kafka Connect?
9

sangshuduo's avatar
sangshuduo 已提交
10
Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。
11

12
![](kafka/Kafka_Connect.png)
13

sangshuduo's avatar
sangshuduo 已提交
14
TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。
15

16
![](kafka/streaming-integration-with-kafka-connect.png)
17

sangshuduo's avatar
sangshuduo 已提交
18
## 什么是 Confluent?
19

sangshuduo's avatar
sangshuduo 已提交
20
Confluent 在 Kafka 的基础上增加很多扩展功能。包括:
21 22

1. Schema Registry
sangshuduo's avatar
sangshuduo 已提交
23 24 25 26
2. REST 代理
3. 非 Java 客户端
4. 很多打包好的 Kafka Connect 插件
5. 管理和监控 Kafka 的 GUI —— Confluent 控制中心
27

sangshuduo's avatar
sangshuduo 已提交
28
这些扩展功能有的包含在社区版本的 Confluent 中,有的只有企业版能用。
29
![](kafka/confluentPlatform.png)
30

sangshuduo's avatar
sangshuduo 已提交
31
Confluent 企业版提供了 `confluent` 命令行工具管理各个组件。
32

sangshuduo's avatar
sangshuduo 已提交
33
## 前置条件
34

sangshuduo's avatar
sangshuduo 已提交
35
运行本教程中示例的前提条件。
36

sangshuduo's avatar
sangshuduo 已提交
37 38 39 40
1. Linux 操作系统
2. 已安装 Java 8 和 Maven
3. 已安装 Git
4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](/operation/pkg-install)
41

sangshuduo's avatar
sangshuduo 已提交
42
## 安装 Confluent
43

sangshuduo's avatar
sangshuduo 已提交
44
Confluent 提供了 Docker 和二进制包两种安装方式。本文仅介绍二进制包方式安装。
45

sangshuduo's avatar
sangshuduo 已提交
46 47 48
在任意目录下执行:

```
49 50
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz
tar xzf confluent-7.1.1.tar.gz -C /opt/test
sangshuduo's avatar
sangshuduo 已提交
51
```
52

sangshuduo's avatar
sangshuduo 已提交
53
然后需要把 `$CONFLUENT_HOME/bin` 目录加入 PATH。
54 55 56 57 58 59 60

```title=".profile"
export CONFLUENT_HOME=/opt/confluent-7.1.1
PATH=$CONFLUENT_HOME/bin
export PATH
```

sangshuduo's avatar
sangshuduo 已提交
61
以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile)
62

sangshuduo's avatar
sangshuduo 已提交
63
安装完成之后,可以输入`confluent version`做简单验证:
64 65 66 67 68 69 70 71 72 73 74 75

```
# confluent version
confluent - Confluent CLI

Version:     v2.6.1
Git Ref:     6d920590
Build Date:  2022-02-18T06:14:21Z
Go Version:  go1.17.6 (linux/amd64)
Development: false
```

sangshuduo's avatar
sangshuduo 已提交
76
## 安装 TDengine Connector 插件
77

sangshuduo's avatar
sangshuduo 已提交
78
### 从源码安装
79 80 81 82 83 84 85 86

```
git clone https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0.zip
```

sangshuduo's avatar
sangshuduo 已提交
87
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件 `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties` 中。默认的路径为 `$CONFLUENT_HOME/share/confluent-hub-components/`
88

sangshuduo's avatar
sangshuduo 已提交
89
### 用 confluent-hub 安装
90

sangshuduo's avatar
sangshuduo 已提交
91 92
[Confluent Hub](https://www.confluent.io/hub) 提供下载 Kafka Connect 插件的服务。在 TDengine Kafka Connector 发布到 Confluent Hub 后可以使用命令工具 `confluent-hub` 安装。
**TDengine Kafka Connector 目前没有正式发布,不能用这种方式安装**
93

sangshuduo's avatar
sangshuduo 已提交
94
## 启动 Confluent
95 96 97 98 99 100

```
confluent local services start
```

:::note
sangshuduo's avatar
sangshuduo 已提交
101
一定要先安装插件再启动 Confluent, 否则会出现找不到类的错误。Kafka Connect 的日志(默认路径: /tmp/confluent.xxxx/connect/logs/connect.log)中会输出成功安装的插件,据此可判断插件是否安装成功。
102 103 104
:::

:::tip
sangshuduo's avatar
sangshuduo 已提交
105
若某组件启动失败,可尝试清空数据,重新启动。数据目录在启动时将被打印到控制台,比如 :
106

sangshuduo's avatar
sangshuduo 已提交
107
```title="控制台输出日志" {1}
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
Using CONFLUENT_CURRENT: /tmp/confluent.106668
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
```

sangshuduo's avatar
sangshuduo 已提交
125
清空数据可执行 `rm -rf /tmp/confluent.106668`
126 127
:::

sangshuduo's avatar
sangshuduo 已提交
128
## TDengine Sink Connector 的使用
129

sangshuduo's avatar
sangshuduo 已提交
130
TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。
131

sangshuduo's avatar
sangshuduo 已提交
132
TDengine Sink Connector 内部使用 TDengine [无模式写入接口](/reference/connector/cpp#无模式写入-api)写数据到 TDengine,目前支持三种格式的数据:[InfluxDB 行协议格式](/develop/insert-data/influxdb-line)[OpenTSDB Telnet 协议格式](/develop/insert-data/opentsdb-telnet)[OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json)
133

sangshuduo's avatar
sangshuduo 已提交
134
下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。
135

sangshuduo's avatar
sangshuduo 已提交
136
### 添加配置文件
137 138 139 140 141 142 143

```
mkdir ~/test
cd ~/test
vi sink-demo.properties
```

sangshuduo's avatar
sangshuduo 已提交
144
sink-demo.properties 内容如下:
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159

```ini title="sink-demo.properties"
name=tdengine-sink-demo
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

sangshuduo's avatar
sangshuduo 已提交
160
关键配置说明:
161

sangshuduo's avatar
sangshuduo 已提交
162 163
1. `topics=meters``connection.database=power`, 表示订阅主题 meters 的数据,并写入数据库 power。
2. `db.schemaless=line`, 表示使用 InfluxDB Line 协议格式的数据。
164

sangshuduo's avatar
sangshuduo 已提交
165
### 创建 Connector 实例
166

sangshuduo's avatar
sangshuduo 已提交
167
```
168
confluent local services connect connector load TDengineSinkConnector --config ./sink-demo.properties
sangshuduo's avatar
sangshuduo 已提交
169
```
170

sangshuduo's avatar
sangshuduo 已提交
171
若以上命令执行成功,则有如下输出:
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193

```json
{
  "name": "TDengineSinkConnector",
  "config": {
    "connection.database": "power",
    "connection.password": "taosdata",
    "connection.url": "jdbc:TAOS://127.0.0.1:6030",
    "connection.user": "root",
    "connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
    "db.schemaless": "line",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "meters",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "TDengineSinkConnector"
  },
  "tasks": [],
  "type": "sink"
}
```

sangshuduo's avatar
sangshuduo 已提交
194
### 写入测试数据
195

sangshuduo's avatar
sangshuduo 已提交
196
准备测试数据的文本文件,内容如下:
197 198 199 200 201 202 203 204

```txt title="test-data.txt"
meters,location=Beijing.Haidian,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000
meters,location=Beijing.Haidian,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250000000
meters,location=Beijing.Haidian,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249000000
meters,location=Beijing.Haidian,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000
```

sangshuduo's avatar
sangshuduo 已提交
205
使用 kafka-console-producer 向主题 meters 添加测试数据。
206 207 208 209 210 211

```
cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic meters
```

:::note
sangshuduo's avatar
sangshuduo 已提交
212
如果目标数据库 power 不存在,那么 TDengine Sink Connector 会自动创建数据库。自动创建数据库使用的时间精度为纳秒,这就要求写入数据的时间戳精度也是纳秒。如果写入数据的时间戳精度不是纳秒,将会抛异常。
213 214
:::

sangshuduo's avatar
sangshuduo 已提交
215
### 验证同步是否成功
216

sangshuduo's avatar
sangshuduo 已提交
217
使用 TDengine CLI 验证同步是否成功。
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232

```
taos> use power;
Database changed.

taos> select * from meters;
              ts               |          current          |          voltage          |           phase           | groupid |            location            |
===============================================================================================================================================================
 2022-03-28 09:56:51.249000000 |              11.800000000 |             221.000000000 |               0.280000000 | 2       | Beijing.Haidian                |
 2022-03-28 09:56:51.250000000 |              13.400000000 |             223.000000000 |               0.290000000 | 2       | Beijing.Haidian                |
 2022-03-28 09:56:51.249000000 |              10.800000000 |             223.000000000 |               0.290000000 | 3       | Beijing.Haidian                |
 2022-03-28 09:56:51.250000000 |              11.300000000 |             221.000000000 |               0.350000000 | 3       | Beijing.Haidian                |
Query OK, 4 row(s) in set (0.004208s)
```

sangshuduo's avatar
sangshuduo 已提交
233
若看到了以上数据,则说明同步成功。若没有,请检查 Kafka Connect 的日志。配置参数的详细说明见[配置参考](#配置参考)
234

sangshuduo's avatar
sangshuduo 已提交
235
## TDengine Source Connector 的使用
236

sangshuduo's avatar
sangshuduo 已提交
237
TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是,先分批拉取历史数据,再用定时查询的策略同步增量数据。同时会监控表的变化,可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。
238

sangshuduo's avatar
sangshuduo 已提交
239
TDengine Source Connector 会将 TDengine 数据表中的数据转换成 [InfluxDB Line 协议格式](/develop/insert-data/influxdb-line/)[OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json), 然后写入 Kafka。
240

sangshuduo's avatar
sangshuduo 已提交
241
下面的示例程序同步数据库 test 中的数据到主题 tdengine-source-test。
242

sangshuduo's avatar
sangshuduo 已提交
243
### 添加配置文件
244 245 246 247 248

```
vi source-demo.properties
```

sangshuduo's avatar
sangshuduo 已提交
249
输入以下内容:
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268

```ini title="source-demo.properties"
name=TDengineSourceConnector
connector.class=com.taosdata.kafka.connect.source.TDengineSourceConnector
tasks.max=1
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.username=root
connection.password=taosdata
connection.database=test
connection.attempts=3
connection.backoff.ms=5000
topic.prefix=tdengine-source-
poll.interval.ms=1000
fetch.max.rows=100
out.format=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

sangshuduo's avatar
sangshuduo 已提交
269
### 准备测试数据
270

sangshuduo's avatar
sangshuduo 已提交
271
准备生成测试数据的 SQL 文件。
272 273 274 275 276 277 278 279 280

```sql title="prepare-source-data.sql"
DROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;
CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);
INSERT INTO d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) d1002 USING meters TAGS(Beijing.Chaoyang, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000);
```

sangshuduo's avatar
sangshuduo 已提交
281
使用 TDengine CLI, 执行 SQL 文件。
282 283

```
D
dingbo 已提交
284
taos -f prepare-source-data.sql
285 286
```

sangshuduo's avatar
sangshuduo 已提交
287
### 创建 Connector 实例
288

sangshuduo's avatar
sangshuduo 已提交
289
```
290
confluent local services connect connector load TDengineSourceConnector --config source-demo.properties
sangshuduo's avatar
sangshuduo 已提交
291
```
292

sangshuduo's avatar
sangshuduo 已提交
293
### 查看 topic 数据
294

sangshuduo's avatar
sangshuduo 已提交
295
使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。
296

sangshuduo's avatar
sangshuduo 已提交
297
```
298
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test
sangshuduo's avatar
sangshuduo 已提交
299
```
300

sangshuduo's avatar
sangshuduo 已提交
301
输出:
302

sangshuduo's avatar
sangshuduo 已提交
303
```
304 305 306 307
......
meters,location="beijing.chaoyang",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="beijing.chaoyang",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
......
sangshuduo's avatar
sangshuduo 已提交
308
```
309

sangshuduo's avatar
sangshuduo 已提交
310
此时会显示所有历史数据。切换到 TDengine CLI, 插入两条新的数据:
311

sangshuduo's avatar
sangshuduo 已提交
312
```
313 314 315
USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
sangshuduo's avatar
sangshuduo 已提交
316
```
317

sangshuduo's avatar
sangshuduo 已提交
318
再切换回 kafka-console-consumer, 此时命令行窗口已经打印出刚插入的 2 条数据。
319

sangshuduo's avatar
sangshuduo 已提交
320
### unload 插件
321

sangshuduo's avatar
sangshuduo 已提交
322
测试完毕之后,用 unload 命令停止已加载的 connector。
323

sangshuduo's avatar
sangshuduo 已提交
324
查看当前活跃的 connector:
325

sangshuduo's avatar
sangshuduo 已提交
326
```
327
confluent local services connect connector status
sangshuduo's avatar
sangshuduo 已提交
328
```
329

sangshuduo's avatar
sangshuduo 已提交
330
如果按照前述操作,此时应有两个活跃的 connector。使用下面的命令 unload:
331

sangshuduo's avatar
sangshuduo 已提交
332
```
333 334
confluent local services connect connector unload TDengineSourceConnector
confluent local services connect connector unload TDengineSourceConnector
sangshuduo's avatar
sangshuduo 已提交
335
```
336

sangshuduo's avatar
sangshuduo 已提交
337
## 配置参考
338

sangshuduo's avatar
sangshuduo 已提交
339
### 通用配置
340

sangshuduo's avatar
sangshuduo 已提交
341
以下配置项对 TDengine Sink Connector 和 TDengine Source Connector 均适用。
342

sangshuduo's avatar
sangshuduo 已提交
343 344 345 346 347 348 349 350 351
1. `name`: connector 名称。
2. `connector.class`: connector 的完整类名, 如: com.taosdata.kafka.connect.sink.TDengineSinkConnector。
3. `tasks.max`: 最大任务数, 默认 1。
4. `topics`: 需要同步的 topic 列表, 多个用逗号分隔, 如 `topic1,topic2`
5. `connection.url`: TDengine JDBC 连接字符串, 如 `jdbc:TAOS://127.0.0.1:6030`
6. `connection.user`: TDengine 用户名, 默认 root。
7. `connection.password` :TDengine 用户密码, 默认 taosdata。
8. `connection.attempts` :最大尝试连接次数。默认 3。
9. `connection.backoff.ms` : 创建连接失败重试时间隔时间,单位为 ms。 默认 5000。
352

sangshuduo's avatar
sangshuduo 已提交
353
### TDengine Sink Connector 特有的配置
354

sangshuduo's avatar
sangshuduo 已提交
355 356 357 358 359 360
1. `connection.database`: 目标数据库名。如果指定的数据库不存在会则自动创建。自动建库使用的时间精度为纳秒。默认值为 null。为 null 时目标数据库命名规则参考 `connection.database.prefix` 参数的说明
2. `connection.database.prefix`: 当 connection.database 为 null 时, 目标数据库的前缀。可以包含占位符 '${topic}'。 比如 kafka_${topic}, 对于主题 'orders' 将写入数据库 'kafka_orders'。 默认 null。当为 null 时,目标数据库的名字和主题的名字是一致的。
3. `batch.size`: 分批写入每批记录数。当 Sink Connector 一次接收到的数据大于这个值时将分批写入。
4. `max.retries`: 发生错误时的最大重试次数。默认为 1。
5. `retry.backoff.ms`: 发送错误时重试的时间间隔。单位毫秒,默认 3000。
6. `db.schemaless`: 数据格式,必须指定为: line、json、telnet 中的一个。分别代表 InfluxDB 行协议格式、 OpenTSDB JSON 格式、 OpenTSDB Telnet 行协议格式。
361

sangshuduo's avatar
sangshuduo 已提交
362
### TDengine Source Connector 特有的配置
363

sangshuduo's avatar
sangshuduo 已提交
364 365 366 367 368 369
1. `connection.database`: 源数据库名称,无缺省值。
2. `topic.prefix`: 数据导入 kafka 后 topic 名称前缀。 使用 `topic.prefix` + `connection.database` 名称作为完整 topic 名。默认为空字符串 ""。
3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认 "1970-01-01 00:00:00"。
4. `poll.interval.ms`: 拉取数据间隔,单位为 ms。默认 1000。
5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。
6. `out.format`: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认 line。
370

sangshuduo's avatar
sangshuduo 已提交
371
## 问题反馈
372 373 374

https://github.com/taosdata/kafka-connect-tdengine/issues

sangshuduo's avatar
sangshuduo 已提交
375
## 参考
376 377 378 379

1. https://www.confluent.io/what-is-apache-kafka
2. https://developer.confluent.io/learn-kafka/kafka-connect/intro
3. https://docs.confluent.io/platform/current/platform.html