11-kafka.md 17.2 KB
Newer Older
1 2
---
sidebar_label: Kafka
sangshuduo's avatar
sangshuduo 已提交
3
title: TDengine Kafka Connector Tutorial
4 5
---

sangshuduo's avatar
sangshuduo 已提交
6
TDengine Kafka Connector contains two plugins: TDengine Source Connector and TDengine Sink Connector. Users only need to provide a simple configuration file to synchronize the data of the specified topic in Kafka (batch or real-time) to TDengine or synchronize the data (batch or real-time) of the specified database in TDengine to Kafka.
7

sangshuduo's avatar
sangshuduo 已提交
8
## What is Kafka Connect?
9

sangshuduo's avatar
sangshuduo 已提交
10
Kafka Connect is a component of Apache Kafka that enables other systems, such as databases, cloud services, file systems, etc., to connect to Kafka easily. Data can flow from other software to Kafka via Kafka Connect and Kafka to other systems via Kafka Connect. Plugins that read data from other software are called Source Connectors, and plugins that write data to other software are called Sink Connectors. Neither Source Connector nor Sink Connector will directly connect to Kafka Broker, and Source Connector transfers data to Kafka Connect. Sink Connector receives data from Kafka Connect.
11 12 13

![](kafka/Kafka_Connect.png)

sangshuduo's avatar
sangshuduo 已提交
14
TDengine Source Connector is used to read data from TDengine in real-time and send it to Kafka Connect. Users can use The TDengine Sink Connector to receive data from Kafka Connect and write it to TDengine.
15 16 17

![](kafka/streaming-integration-with-kafka-connect.png)

sangshuduo's avatar
sangshuduo 已提交
18
## What is Confluent?
19

sangshuduo's avatar
sangshuduo 已提交
20
Confluent adds many extensions to Kafka. include:
21 22

1. Schema Registry
sangshuduo's avatar
sangshuduo 已提交
23 24 25 26
2. REST Proxy
3. Non-Java Clients
4. Many packaged Kafka Connect plugins
5. GUI for managing and monitoring Kafka - Confluent Control Center
27

sangshuduo's avatar
sangshuduo 已提交
28
Some of these extensions are available in the community version of Confluent. Some are only available in the enterprise version.
29 30
![](kafka/confluentPlatform.png)

sangshuduo's avatar
sangshuduo 已提交
31
Confluent Enterprise Edition provides the `confluent` command-line tool to manage various components.
32

sangshuduo's avatar
sangshuduo 已提交
33
## Prerequisites
34

sangshuduo's avatar
sangshuduo 已提交
35 36 37 38
1. Linux operating system
2. Java 8 and Maven installed
3. Git is installed
4. TDengine is installed and started. If not, please refer to [Installation and Uninstallation](/operation/pkg-install)
39

sangshuduo's avatar
sangshuduo 已提交
40
## Install Confluent
41

sangshuduo's avatar
sangshuduo 已提交
42
Confluent provides two installation methods: Docker and binary packages. This article only introduces binary package installation.
43

sangshuduo's avatar
sangshuduo 已提交
44
Execute in any directory:
45

sangshuduo's avatar
sangshuduo 已提交
46
````
47 48
curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz
tar xzf confluent-7.1.1.tar.gz -C /opt/test
sangshuduo's avatar
sangshuduo 已提交
49
````
50

sangshuduo's avatar
sangshuduo 已提交
51
Then you need to add the `$CONFLUENT_HOME/bin` directory to the PATH.
52 53 54 55 56 57 58

```title=".profile"
export CONFLUENT_HOME=/opt/confluent-7.1.1
PATH=$CONFLUENT_HOME/bin
export PATH
```

sangshuduo's avatar
sangshuduo 已提交
59
Users can append the above script to the current user's profile file (~/.profile or ~/.bash_profile)
60

sangshuduo's avatar
sangshuduo 已提交
61
After the installation is complete, you can enter `confluent version` for simple verification:
62 63 64 65 66 67 68 69 70 71 72 73

```
# confluent version
confluent - Confluent CLI

Version:     v2.6.1
Git Ref:     6d920590
Build Date:  2022-02-18T06:14:21Z
Go Version:  go1.17.6 (linux/amd64)
Development: false
```

sangshuduo's avatar
sangshuduo 已提交
74
## Install TDengine Connector plugin
75

sangshuduo's avatar
sangshuduo 已提交
76
### Install from source code
77 78 79 80 81 82 83 84

```
git clone https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0.zip
```

sangshuduo's avatar
sangshuduo 已提交
85
The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to the path where the plugin is installed. The path to install the plugin is in the configuration file `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties`. The default path is `$CONFLUENT_HOME/share/confluent-hub-components/`.
86

sangshuduo's avatar
sangshuduo 已提交
87
### Install with confluent-hub
88

sangshuduo's avatar
sangshuduo 已提交
89 90
[Confluent Hub](https://www.confluent.io/hub) provides a service to download Kafka Connect plugins. After TDengine Kafka Connector is published to Confluent Hub, it can be installed using the command tool `confluent-hub`.
**TDengine Kafka Connector is currently not officially released and cannot be installed in this way**.
91

sangshuduo's avatar
sangshuduo 已提交
92
## Start Confluent
93 94 95 96 97 98

```
confluent local services start
```

:::note
sangshuduo's avatar
sangshuduo 已提交
99
Be sure to install the plugin before starting Confluent. Otherwise, there will be a class not found error. The log of Kafka Connect (default path: /tmp/confluent.xxxx/connect/logs/connect.log) will output the successfully installed plugin, which users can use to determine whether the plugin is installed successfully.
100 101 102
:::

:::tip
sangshuduo's avatar
sangshuduo 已提交
103
If a component fails to start, try clearing the data and restarting. The data directory will be printed to the console at startup, e.g.:
104

sangshuduo's avatar
sangshuduo 已提交
105
```title="Console output log" {1}
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
Using CONFLUENT_CURRENT: /tmp/confluent.106668
Starting ZooKeeper
ZooKeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting ksqlDB Server
ksqlDB Server is [UP]
Starting Control Center
Control Center is [UP]
```

sangshuduo's avatar
sangshuduo 已提交
123
To clear data, execute `rm -rf /tmp/confluent.106668`.
124 125
:::

sangshuduo's avatar
sangshuduo 已提交
126
## The use of TDengine Sink Connector
127

sangshuduo's avatar
sangshuduo 已提交
128
The role of the TDengine Sink Connector is to synchronize the data of the specified topic to TDengine. Users do not need to create databases and super tables in advance. The name of the target database can be specified manually (see the configuration parameter connection.database), or it can be generated according to specific rules (see the configuration parameter connection.database.prefix).
129

sangshuduo's avatar
sangshuduo 已提交
130
TDengine Sink Connector internally uses TDengine [modeless write interface](/reference/connector/cpp#modeless write-api) to write data to TDengine, currently supports data in three formats: [InfluxDB line protocol format](/develop /insert-data/influxdb-line), [OpenTSDB Telnet protocol format](/develop/insert-data/opentsdb-telnet), and [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json).
131

sangshuduo's avatar
sangshuduo 已提交
132
The following example synchronizes the data of the topic meters to the target database power. The data format is the InfluxDB Line protocol format.
133

sangshuduo's avatar
sangshuduo 已提交
134
### Add configuration file
135 136 137 138 139 140 141

```
mkdir ~/test
cd ~/test
vi sink-demo.properties
```

sangshuduo's avatar
sangshuduo 已提交
142
sink-demo.properties' content is following:
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157

```ini title="sink-demo.properties"
name=tdengine-sink-demo
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

sangshuduo's avatar
sangshuduo 已提交
158
Key configuration instructions:
159

sangshuduo's avatar
sangshuduo 已提交
160 161
1. `topics=meters` and `connection.database=power` means to subscribe to the data of the topic meters and write to the database power.
2. `db.schemaless=line` means the data in the InfluxDB Line protocol format.
162

sangshuduo's avatar
sangshuduo 已提交
163
### Create Connector instance
164

sangshuduo's avatar
sangshuduo 已提交
165
````
166
confluent local services connect connector load TDengineSinkConnector --config ./sink-demo.properties
sangshuduo's avatar
sangshuduo 已提交
167
````
168

sangshuduo's avatar
sangshuduo 已提交
169
If the above command is executed successfully, the output is as follows:
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

```json
{
  "name": "TDengineSinkConnector",
  "config": {
    "connection.database": "power",
    "connection.password": "taosdata",
    "connection.url": "jdbc:TAOS://127.0.0.1:6030",
    "connection.user": "root",
    "connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
    "db.schemaless": "line",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "meters",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "TDengineSinkConnector"
  },
  "tasks": [],
  "type": "sink"
}
```

sangshuduo's avatar
sangshuduo 已提交
192
### Write test data
193

sangshuduo's avatar
sangshuduo 已提交
194
Prepare text file as test data, its content is following:
195 196 197 198 199 200 201 202

```txt title="test-data.txt"
meters,location=Beijing.Haidian,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000
meters,location=Beijing.Haidian,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250000000
meters,location=Beijing.Haidian,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249000000
meters,location=Beijing.Haidian,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000
```

sangshuduo's avatar
sangshuduo 已提交
203
Use kafka-console-producer to write test data to the topic `meters`.
204 205 206 207 208 209

```
cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic meters
```

:::note
sangshuduo's avatar
sangshuduo 已提交
210
TDengine Sink Connector will automatically create the database if the target database does not exist. The time precision used to create the database automatically is nanoseconds, which requires that the timestamp precision of the written data is also nanoseconds. An exception will be thrown if the timestamp precision of the written data is not nanoseconds.
211 212
:::

sangshuduo's avatar
sangshuduo 已提交
213
### Verify that the sync was successful
214

sangshuduo's avatar
sangshuduo 已提交
215
Use the TDengine CLI to verify that the sync was successful.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230

```
taos> use power;
Database changed.

taos> select * from meters;
              ts               |          current          |          voltage          |           phase           | groupid |            location            |
===============================================================================================================================================================
 2022-03-28 09:56:51.249000000 |              11.800000000 |             221.000000000 |               0.280000000 | 2       | Beijing.Haidian                |
 2022-03-28 09:56:51.250000000 |              13.400000000 |             223.000000000 |               0.290000000 | 2       | Beijing.Haidian                |
 2022-03-28 09:56:51.249000000 |              10.800000000 |             223.000000000 |               0.290000000 | 3       | Beijing.Haidian                |
 2022-03-28 09:56:51.250000000 |              11.300000000 |             221.000000000 |               0.350000000 | 3       | Beijing.Haidian                |
Query OK, 4 row(s) in set (0.004208s)
```

sangshuduo's avatar
sangshuduo 已提交
231
If you see the above data, the synchronization is successful. If not, check the logs of Kafka Connect. For detailed description of configuration parameters, see [Configuration Reference](#Configuration Reference).
232

sangshuduo's avatar
sangshuduo 已提交
233
## The use of TDengine Source Connector
234

sangshuduo's avatar
sangshuduo 已提交
235
The role of the TDengine Source Connector is to push all the data of a specific TDengine database after a particular time to Kafka. The implementation principle of TDengine Source Connector is to first pull historical data in batches and then synchronize incremental data with the strategy of the regular query. At the same time, the changes in the table will be monitored, and the newly added table can be automatically synchronized. If Kafka Connect is restarted, synchronization will resume where it left off.
236

sangshuduo's avatar
sangshuduo 已提交
237
TDengine Source Connector will convert the data in TDengine data table into [InfluxDB Line protocol format](/develop/insert-data/influxdb-line/) or [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json ) and then write to Kafka.
238

sangshuduo's avatar
sangshuduo 已提交
239
The following sample program synchronizes the data in the database test to the topic tdengine-source-test.
240

sangshuduo's avatar
sangshuduo 已提交
241
### Add configuration file
242 243 244 245 246

```
vi source-demo.properties
```

sangshuduo's avatar
sangshuduo 已提交
247
Input following content:
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266

```ini title="source-demo.properties"
name=TDengineSourceConnector
connector.class=com.taosdata.kafka.connect.source.TDengineSourceConnector
tasks.max=1
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.username=root
connection.password=taosdata
connection.database=test
connection.attempts=3
connection.backoff.ms=5000
topic.prefix=tdengine-source-
poll.interval.ms=1000
fetch.max.rows=100
out.format=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

sangshuduo's avatar
sangshuduo 已提交
267
### Prepare test data
268

sangshuduo's avatar
sangshuduo 已提交
269
Prepare SQL script file to generate test data
270 271 272 273 274 275 276 277 278

```sql title="prepare-source-data.sql"
DROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;
CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);
INSERT INTO d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) d1002 USING meters TAGS(Beijing.Chaoyang, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000);
```

sangshuduo's avatar
sangshuduo 已提交
279
Use TDengine CLI to execute SQL script
280 281 282 283 284

```
taos -f prepare-sorce-data.sql
```

sangshuduo's avatar
sangshuduo 已提交
285
### Create Connector instance
286

sangshuduo's avatar
sangshuduo 已提交
287
````
288
confluent local services connect connector load TDengineSourceConnector --config source-demo.properties
sangshuduo's avatar
sangshuduo 已提交
289
````
290

sangshuduo's avatar
sangshuduo 已提交
291
### View topic data
292

sangshuduo's avatar
sangshuduo 已提交
293
Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data.
294

sangshuduo's avatar
sangshuduo 已提交
295
````
296
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test
sangshuduo's avatar
sangshuduo 已提交
297
````
298

sangshuduo's avatar
sangshuduo 已提交
299
output:
300

sangshuduo's avatar
sangshuduo 已提交
301
````
302 303 304 305
......
meters,location="beijing.chaoyang",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="beijing.chaoyang",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
......
sangshuduo's avatar
sangshuduo 已提交
306
````
307

sangshuduo's avatar
sangshuduo 已提交
308
All historical data is displayed. Switch to the TDengine CLI and insert two new pieces of data:
309

sangshuduo's avatar
sangshuduo 已提交
310
````
311 312 313
USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
sangshuduo's avatar
sangshuduo 已提交
314
````
315

sangshuduo's avatar
sangshuduo 已提交
316
Switch back to kafka-console-consumer, and the command line window has printed out the two pieces of data just inserted.
317

sangshuduo's avatar
sangshuduo 已提交
318
### unload plugin
319

sangshuduo's avatar
sangshuduo 已提交
320
After testing, use the unload command to stop the loaded connector.
321

sangshuduo's avatar
sangshuduo 已提交
322
View currently active connectors:
323

sangshuduo's avatar
sangshuduo 已提交
324
````
325
confluent local services connect connector status
sangshuduo's avatar
sangshuduo 已提交
326
````
327

sangshuduo's avatar
sangshuduo 已提交
328
You should now have two active connectors if you followed the previous steps. Use the following command to unload:
329

sangshuduo's avatar
sangshuduo 已提交
330
````
331 332
confluent local services connect connector unload TDengineSourceConnector
confluent local services connect connector unload TDengineSourceConnector
sangshuduo's avatar
sangshuduo 已提交
333
````
334

sangshuduo's avatar
sangshuduo 已提交
335
## Configuration reference
336

sangshuduo's avatar
sangshuduo 已提交
337
### General configuration
338

sangshuduo's avatar
sangshuduo 已提交
339
The following configuration items apply to TDengine Sink Connector and TDengine Source Connector.
340

sangshuduo's avatar
sangshuduo 已提交
341 342 343 344 345 346 347 348 349
1. `name`: The name of the connector.
2. `connector.class`: The full class name of the connector, for example: com.taosdata.kafka.connect.sink.TDengineSinkConnector.
3. `tasks.max`: The maximum number of tasks, the default is 1.
4. `topics`: A list of topics to be synchronized, separated by commas, such as `topic1,topic2`.
5. `connection.url`: TDengine JDBC connection string, such as `jdbc:TAOS://127.0.0.1:6030`.
6. `connection.user`: TDengine username, default root.
7. `connection.password`: TDengine user password, default taosdata.
8. `connection.attempts` : The maximum number of connection attempts. Default 3.
9. `connection.backoff.ms`: The retry interval for connection creation failure, the unit is ms. Default is 5000.
350

sangshuduo's avatar
sangshuduo 已提交
351
### TDengine Sink Connector specific configuration
352

sangshuduo's avatar
sangshuduo 已提交
353 354 355 356 357 358
1. `connection.database`: The name of the target database. If the specified database does not exist, it will be created automatically. The time precision used for automatic library building is nanoseconds. The default value is null. When it is NULL, refer to the description of the `connection.database.prefix` parameter for the naming rules of the target database
2. `connection.database.prefix`: When `connection.database` is null, the prefix of the target database. Can contain placeholder '${topic}'. For example, kafka_${topic}, for topic 'orders' will be written to database 'kafka_orders'. Default null. When null, the name of the target database is the same as the name of the topic.
3. `batch.size`: Write the number of records in each batch in batches. When the data received by the sink connector at one time is larger than this value, it will be written in some batches.
4. `max.retries`: The maximum number of retries when an error occurs. Defaults to 1.
5. `retry.backoff.ms`: The time interval for retry when sending an error. The unit is milliseconds. The default is 3000.
6. `db.schemaless`: Data format, could be one of `line`, `json`, and `telnet`. Represent InfluxDB line protocol format, OpenTSDB JSON format, and OpenTSDB Telnet line protocol format.
359

sangshuduo's avatar
sangshuduo 已提交
360
### TDengine Source Connector specific configuration
361

sangshuduo's avatar
sangshuduo 已提交
362 363 364 365 366 367
1. `connection.database`: source database name, no default value.
2. `topic.prefix`: topic name prefix after data is imported into kafka. Use `topic.prefix` + `connection.database` name as the full topic name. Defaults to the empty string "".
3. `timestamp.initial`: Data synchronization start time. The format is 'yyyy-MM-dd HH:mm:ss'. Default "1970-01-01 00:00:00".
4. `poll.interval.ms`: Pull data interval, the unit is ms. Default is 1000.
5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database. Default is 100.
6. `out.format`: The data format. The value could be line or json. The line represents the InfluxDB Line protocol format, and json represents the OpenTSDB JSON format. Default is `line`.
368

sangshuduo's avatar
sangshuduo 已提交
369
## feedback
370 371 372

https://github.com/taosdata/kafka-connect-tdengine/issues

sangshuduo's avatar
sangshuduo 已提交
373
## Reference
374 375 376 377

1. https://www.confluent.io/what-is-apache-kafka
2. https://developer.confluent.io/learn-kafka/kafka-connect/intro
3. https://docs.confluent.io/platform/current/platform.html