11-kafka.md 18.2 KB
Newer Older
1 2
---
title: TDengine Kafka Connector Tutorial
D
danielclow 已提交
3 4
sidebar_label: Kafka
description: This document describes how to integrate TDengine with Kafka.
5 6 7 8 9 10
---

TDengine Kafka Connector contains two plugins: TDengine Source Connector and TDengine Sink Connector. Users only need to provide a simple configuration file to synchronize the data of the specified topic in Kafka (batch or real-time) to TDengine or synchronize the data (batch or real-time) of the specified database in TDengine to Kafka.

## What is Kafka Connect?

11
Kafka Connect is a component of [Apache Kafka](https://kafka.apache.org/) that enables other systems, such as databases, cloud services, file systems, etc., to connect to Kafka easily. Data can flow from other software to Kafka via Kafka Connect and Kafka to other systems via Kafka Connect. Plugins that read data from other software are called Source Connectors, and plugins that write data to other software are called Sink Connectors. Neither Source Connector nor Sink Connector will directly connect to Kafka Broker, and Source Connector transfers data to Kafka Connect. Sink Connector receives data from Kafka Connect.
12

D
dingbo 已提交
13
![TDengine Database Kafka Connector -- Kafka Connect](kafka/Kafka_Connect.webp)
14 15 16

TDengine Source Connector is used to read data from TDengine in real-time and send it to Kafka Connect. Users can use The TDengine Sink Connector to receive data from Kafka Connect and write it to TDengine.

D
dingbo 已提交
17
![TDengine Database Kafka Connector -- streaming integration with kafka connect](kafka/streaming-integration-with-kafka-connect.webp)
18 19 20 21 22

## Prerequisites

1. Linux operating system
2. Java 8 and Maven installed
23
3. Git/curl/vi is installed
24 25
4. TDengine is installed and started. If not, please refer to [Installation and Uninstallation](/operation/pkg-install)

26
## Install Kafka
27 28 29 30

Execute in any directory:

````
31 32 33
curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
34 35
````

36
Then you need to add the `$KAFKA_HOME/bin` directory to the PATH.
37 38

```title=".profile"
39 40
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
41 42 43 44 45 46 47 48
```

Users can append the above script to the current user's profile file (~/.profile or ~/.bash_profile)

## Install TDengine Connector plugin

### Install from source code

49
```shell
50
git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
51
cd kafka-connect-tdengine
52 53
mvn clean package -Dmaven.test.skip=true
unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
54 55
```

56
The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to plugin path. We used `$KAFKA_HOME/components/` above because it's a build in plugin path.
57

58
### Add configuration file
59

60
add kafka-connect-tdengine plugin path to `plugin.path` in `$KAFKA_HOME/config/connect-distributed.properties`.
61

62 63
```properties
plugin.path=/usr/share/java,/opt/kafka/components
64 65
```

66
## Start Kafka Services
67

68
Use command bellow to start all services:
69

70 71
```shell
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
72

73
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
74

75
connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
76 77 78 79 80 81 82

```

### Check Successfully Loaded Plugin

After Kafka Connect was completely started, you can use bellow command to check if our plugins are installed successfully:

83 84
```shell
curl http://localhost:8083/connectors
85 86
```

87
The output as bellow:
88

89 90
```txt
[]
91 92
```

93 94 95 96 97 98 99 100
## The use of TDengine Sink Connector

The role of the TDengine Sink Connector is to synchronize the data of the specified topic to TDengine. Users do not need to create databases and super tables in advance. The name of the target database can be specified manually (see the configuration parameter connection.database), or it can be generated according to specific rules (see the configuration parameter connection.database.prefix).

TDengine Sink Connector internally uses TDengine [modeless write interface](/reference/connector/cpp#modeless write-api) to write data to TDengine, currently supports data in three formats: [InfluxDB line protocol format](/develop /insert-data/influxdb-line), [OpenTSDB Telnet protocol format](/develop/insert-data/opentsdb-telnet), and [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json).

The following example synchronizes the data of the topic meters to the target database power. The data format is the InfluxDB Line protocol format.

101
### Add Sink Connector configuration file
102

103
```shell
104 105
mkdir ~/test
cd ~/test
106
vi sink-demo.json
107 108
```

109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
sink-demo.json' content is following:

```json title="sink-demo.json"
{
  "name": "TDengineSinkConnector",
  "config": {
    "connector.class":"com.taosdata.kafka.connect.sink.TDengineSinkConnector",
    "tasks.max": "1",
    "topics": "meters",
    "connection.url": "jdbc:TAOS://127.0.0.1:6030",
    "connection.user": "root",
    "connection.password": "taosdata",
    "connection.database": "power",
    "db.schemaless": "line",
    "data.precision": "ns",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dead_letter_topic",
    "errors.deadletterqueue.topic.replication.factor": 1
  }
}
131 132 133 134
```

Key configuration instructions:

135 136
1. `"topics": "meters"` and `"connection.database": "power"` means to subscribe to the data of the topic meters and write to the database power.
2. `"db.schemaless": "line"` means the data in the InfluxDB Line protocol format.
137

138
### Create Sink Connector instance
139

140 141
````shell
curl -X POST -d @sink-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
142 143 144 145 146 147 148 149 150 151 152 153 154
````

If the above command is executed successfully, the output is as follows:

```json
{
  "name": "TDengineSinkConnector",
  "config": {
    "connection.database": "power",
    "connection.password": "taosdata",
    "connection.url": "jdbc:TAOS://127.0.0.1:6030",
    "connection.user": "root",
    "connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
155
    "data.precision": "ns",
156 157 158 159 160
    "db.schemaless": "line",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "meters",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
161 162 163 164
    "name": "TDengineSinkConnector",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dead_letter_topic",
    "errors.deadletterqueue.topic.replication.factor": "1",  
165 166 167 168 169 170 171 172 173 174 175
  },
  "tasks": [],
  "type": "sink"
}
```

### Write test data

Prepare text file as test data, its content is following:

```txt title="test-data.txt"
G
gccgdb1234 已提交
176 177 178 179
meters,location=California.LoSangeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000
meters,location=California.LoSangeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250000000
meters,location=California.LoSangeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249000000
meters,location=California.LoSangeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000
180 181 182 183 184
```

Use kafka-console-producer to write test data to the topic `meters`.

```
185
cat test-data.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
186 187 188 189 190 191 192 193 194 195
```

:::note
TDengine Sink Connector will automatically create the database if the target database does not exist. The time precision used to create the database automatically is nanoseconds, which requires that the timestamp precision of the written data is also nanoseconds. An exception will be thrown if the timestamp precision of the written data is not nanoseconds.
:::

### Verify that the sync was successful

Use the TDengine CLI to verify that the sync was successful.

196
```sql
197 198 199 200
taos> use power;
Database changed.

taos> select * from meters;
201
              _ts               |          current          |          voltage          |           phase           | groupid |            location            |
202
===============================================================================================================================================================
203 204 205 206
 2022-03-28 09:56:51.249000000 |              11.800000000 |             221.000000000 |               0.280000000 | 2       | California.LosAngeles          |
 2022-03-28 09:56:51.250000000 |              13.400000000 |             223.000000000 |               0.290000000 | 2       | California.LosAngeles          |
 2022-03-28 09:56:51.249000000 |              10.800000000 |             223.000000000 |               0.290000000 | 3       | California.LosAngeles          |
 2022-03-28 09:56:51.250000000 |              11.300000000 |             221.000000000 |               0.350000000 | 3       | California.LosAngeles          |
207 208 209
Query OK, 4 row(s) in set (0.004208s)
```

S
Sean Ely 已提交
210
If you see the above data, the synchronization is successful. If not, check the logs of Kafka Connect. For detailed description of configuration parameters, see [Configuration Reference](#configuration-reference).
211 212 213 214 215 216 217

## The use of TDengine Source Connector

The role of the TDengine Source Connector is to push all the data of a specific TDengine database after a particular time to Kafka. The implementation principle of TDengine Source Connector is to first pull historical data in batches and then synchronize incremental data with the strategy of the regular query. At the same time, the changes in the table will be monitored, and the newly added table can be automatically synchronized. If Kafka Connect is restarted, synchronization will resume where it left off.

TDengine Source Connector will convert the data in TDengine data table into [InfluxDB Line protocol format](/develop/insert-data/influxdb-line/) or [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json ) and then write to Kafka.

218
The following sample program synchronizes the data in the database test to the topic tdengine-test-meters.
219

220
### Add Source Connector configuration file
221

222 223
```shell
vi source-demo.json
224 225 226 227
```

Input following content:

228 229 230 231 232 233 234 235 236 237 238 239
```json title="source-demo.json"
{
  "name":"TDengineSourceConnector",
    "config":{
    "connector.class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
    "tasks.max": 1,
    "connection.url": "jdbc:TAOS://127.0.0.1:6030",
    "connection.username": "root",
    "connection.password": "taosdata",
    "connection.database": "test",
    "connection.attempts": 3,
    "connection.backoff.ms": 5000,
240 241
    "topic.prefix": "tdengine",
    "topic.delimiter": "-",
242 243 244
    "poll.interval.ms": 1000,
    "fetch.max.rows": 100,
    "topic.per.stable": true,
H
huolibo 已提交
245 246
    "topic.ignore.db": false,
    "out.format": "line",
247 248 249 250
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}
251 252 253 254 255 256 257 258 259 260 261
```

### Prepare test data

Prepare SQL script file to generate test data

```sql title="prepare-source-data.sql"
DROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;
CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);
W
WANG Xu 已提交
262 263 264 265 266 267 268 269 270

INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) \
            d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) \
            d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) \
            d1002 USING meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) \
            d1003 USING meters TAGS('California.LosAngeles', 2)   VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) \
            d1003 USING meters TAGS('California.LosAngeles', 2)   VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) \
            d1004 USING meters TAGS('California.LosAngeles', 3)   VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) \
            d1004 USING meters TAGS('California.LosAngeles', 3)   VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000);
271 272 273 274
```

Use TDengine CLI to execute SQL script

275
```shell
276 277 278 279 280
taos -f prepare-source-data.sql
```

### Create Connector instance

281 282 283
```shell
curl -X POST -d @source-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
```
284 285 286

### View topic data

287
Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-test-meters. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. The output is in InfluxDB line protocol format.
288

289
````shell
290
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tdengine-test-meters
291 292 293 294
````

output:

295
```txt
296
......
G
gccgdb1234 已提交
297 298
meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
299
......
300
```
301 302 303

All historical data is displayed. Switch to the TDengine CLI and insert two new pieces of data:

304
```sql
305 306 307
USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
308
```
309 310 311 312 313 314 315 316 317

Switch back to kafka-console-consumer, and the command line window has printed out the two pieces of data just inserted.

### unload plugin

After testing, use the unload command to stop the loaded connector.

View currently active connectors:

318 319 320
```shell
curl http://localhost:8083/connectors
```
321 322 323

You should now have two active connectors if you followed the previous steps. Use the following command to unload:

324 325 326 327
```shell
curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
```
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352

## Configuration reference

### General configuration

The following configuration items apply to TDengine Sink Connector and TDengine Source Connector.

1. `name`: The name of the connector.
2. `connector.class`: The full class name of the connector, for example: com.taosdata.kafka.connect.sink.TDengineSinkConnector.
3. `tasks.max`: The maximum number of tasks, the default is 1.
4. `topics`: A list of topics to be synchronized, separated by commas, such as `topic1,topic2`.
5. `connection.url`: TDengine JDBC connection string, such as `jdbc:TAOS://127.0.0.1:6030`.
6. `connection.user`: TDengine username, default root.
7. `connection.password`: TDengine user password, default taosdata.
8. `connection.attempts` : The maximum number of connection attempts. Default 3.
9. `connection.backoff.ms`: The retry interval for connection creation failure, the unit is ms. Default is 5000.

### TDengine Sink Connector specific configuration

1. `connection.database`: The name of the target database. If the specified database does not exist, it will be created automatically. The time precision used for automatic library building is nanoseconds. The default value is null. When it is NULL, refer to the description of the `connection.database.prefix` parameter for the naming rules of the target database
2. `connection.database.prefix`: When `connection.database` is null, the prefix of the target database. Can contain placeholder '${topic}'. For example, kafka_${topic}, for topic 'orders' will be written to database 'kafka_orders'. Default null. When null, the name of the target database is the same as the name of the topic.
3. `batch.size`: Write the number of records in each batch in batches. When the data received by the sink connector at one time is larger than this value, it will be written in some batches.
4. `max.retries`: The maximum number of retries when an error occurs. Defaults to 1.
5. `retry.backoff.ms`: The time interval for retry when sending an error. The unit is milliseconds. The default is 3000.
6. `db.schemaless`: Data format, could be one of `line`, `json`, and `telnet`. Represent InfluxDB line protocol format, OpenTSDB JSON format, and OpenTSDB Telnet line protocol format.
353
7. `data.precision`: The time precision when use InfluxDB line protocol format data, could be one of `ms`, `us` and `ns`. The default is `ns`.
354 355 356 357

### TDengine Source Connector specific configuration

1. `connection.database`: source database name, no default value.
358 359 360 361
2. `topic.prefix`: topic name prefix used when importing data into kafka. Its defaults value is empty string "".
3. `timestamp.initial`: Data synchronization start time. The format is 'yyyy-MM-dd HH:mm:ss'. If it is not set, the data importing to Kafka will be started from the first/oldest row in the database.
4. `poll.interval.ms`: The time interval for checking newly created tables or removed tables, default value is 1000.
5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database, default is 100.
H
huolibo 已提交
362 363
6. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 0, this means to get all the data to the latest time.
7. `out.format`: Result output format. `line` indicates that the output format is InfluxDB line protocol format, `json` indicates that the output format is json. The default is line.
364 365 366
8. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix><topic.delimiter><connection.database>`.
9. `topic.ignore.db`: Whether the topic naming rule contains the database name: true indicates that the rule is `<topic.prefix><topic.delimiter><stable.name>`, false indicates that the rule is `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`, and the default is false. Does not take effect when `topic.per.stable` is set to false.
10. `topic.delimiter`: topic name delimiter,default is `-`
367

368 369
## Other notes

370
1. To use Kafka Connect, refer to <https://kafka.apache.org/documentation/#connect>.
371 372

## Feedback
373

374
<https://github.com/taosdata/kafka-connect-tdengine/issues>
375 376 377

## Reference

378
1. For more information, see <https://kafka.apache.org/documentation/>