diff --git a/docs-cn/20-third-party/11-kafka.md b/docs-cn/20-third-party/11-kafka.md index 83d086308d0a593cc90e2b0a0c0945a52ce259ea..54a378c63380aacd763459ddc5b1acc012c88a06 100644 --- a/docs-cn/20-third-party/11-kafka.md +++ b/docs-cn/20-third-party/11-kafka.md @@ -1,54 +1,56 @@ --- sidebar_label: Kafka -title: TDengine Kafka Connector Tutorial +title: TDengine Kafka Connector 使用教程 --- -TDengine Kafka Connector contains two plugins: TDengine Source Connector and TDengine Sink Connector. Users only need to provide a simple configuration file to synchronize the data of the specified topic in Kafka (batch or real-time) to TDengine or synchronize the data (batch or real-time) of the specified database in TDengine to Kafka. +TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件,就可以将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine, 或将 TDengine 中指定数据库的数据(批量或实时)同步到 Kafka。 -## What is Kafka Connect? +## 什么是 Kafka Connect? -Kafka Connect is a component of Apache Kafka that enables other systems, such as databases, cloud services, file systems, etc., to connect to Kafka easily. Data can flow from other software to Kafka via Kafka Connect and Kafka to other systems via Kafka Connect. Plugins that read data from other software are called Source Connectors, and plugins that write data to other software are called Sink Connectors. Neither Source Connector nor Sink Connector will directly connect to Kafka Broker, and Source Connector transfers data to Kafka Connect. Sink Connector receives data from Kafka Connect. +Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。 ![](kafka/Kafka_Connect.png) -TDengine Source Connector is used to read data from TDengine in real-time and send it to Kafka Connect. Users can use The TDengine Sink Connector to receive data from Kafka Connect and write it to TDengine. +TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。 ![](kafka/streaming-integration-with-kafka-connect.png) -## What is Confluent? +## 什么是 Confluent? -Confluent adds many extensions to Kafka. include: +Confluent 在 Kafka 的基础上增加很多扩展功能。包括: 1. Schema Registry -2. REST Proxy -3. Non-Java Clients -4. Many packaged Kafka Connect plugins -5. GUI for managing and monitoring Kafka - Confluent Control Center +2. REST 代理 +3. 非 Java 客户端 +4. 很多打包好的 Kafka Connect 插件 +5. 管理和监控 Kafka 的 GUI —— Confluent 控制中心 -Some of these extensions are available in the community version of Confluent. Some are only available in the enterprise version. +这些扩展功能有的包含在社区版本的 Confluent 中,有的只有企业版能用。 ![](kafka/confluentPlatform.png) -Confluent Enterprise Edition provides the `confluent` command-line tool to manage various components. +Confluent 企业版提供了 `confluent` 命令行工具管理各个组件。 -## Prerequisites +## 前置条件 -1. Linux operating system -2. Java 8 and Maven installed -3. Git is installed -4. TDengine is installed and started. If not, please refer to [Installation and Uninstallation](/operation/pkg-install) +运行本教程中示例的前提条件。 -## Install Confluent +1. Linux 操作系统 +2. 已安装 Java 8 和 Maven +3. 已安装 Git +4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](/operation/pkg-install) -Confluent provides two installation methods: Docker and binary packages. This article only introduces binary package installation. +## 安装 Confluent -Execute in any directory: +Confluent 提供了 Docker 和二进制包两种安装方式。本文仅介绍二进制包方式安装。 -```` +在任意目录下执行: + +``` curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz tar xzf confluent-7.1.1.tar.gz -C /opt/test -```` +``` -Then you need to add the `$CONFLUENT_HOME/bin` directory to the PATH. +然后需要把 `$CONFLUENT_HOME/bin` 目录加入 PATH。 ```title=".profile" export CONFLUENT_HOME=/opt/confluent-7.1.1 @@ -56,9 +58,9 @@ PATH=$CONFLUENT_HOME/bin export PATH ``` -Users can append the above script to the current user's profile file (~/.profile or ~/.bash_profile) +以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile) -After the installation is complete, you can enter `confluent version` for simple verification: +安装完成之后,可以输入`confluent version`做简单验证: ``` # confluent version @@ -71,9 +73,9 @@ Go Version: go1.17.6 (linux/amd64) Development: false ``` -## Install TDengine Connector plugin +## 安装 TDengine Connector 插件 -### Install from source code +### 从源码安装 ``` git clone https://github.com:taosdata/kafka-connect-tdengine.git @@ -82,27 +84,27 @@ mvn clean package unzip -d $CONFLUENT_HOME/share/confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0.zip ``` -The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to the path where the plugin is installed. The path to install the plugin is in the configuration file `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties`. The default path is `$CONFLUENT_HOME/share/confluent-hub-components/`. +以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件 `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties` 中。默认的路径为 `$CONFLUENT_HOME/share/confluent-hub-components/`。 -### Install with confluent-hub +### 用 confluent-hub 安装 -[Confluent Hub](https://www.confluent.io/hub) provides a service to download Kafka Connect plugins. After TDengine Kafka Connector is published to Confluent Hub, it can be installed using the command tool `confluent-hub`. -**TDengine Kafka Connector is currently not officially released and cannot be installed in this way**. +[Confluent Hub](https://www.confluent.io/hub) 提供下载 Kafka Connect 插件的服务。在 TDengine Kafka Connector 发布到 Confluent Hub 后可以使用命令工具 `confluent-hub` 安装。 +**TDengine Kafka Connector 目前没有正式发布,不能用这种方式安装**。 -## Start Confluent +## 启动 Confluent ``` confluent local services start ``` :::note -Be sure to install the plugin before starting Confluent. Otherwise, there will be a class not found error. The log of Kafka Connect (default path: /tmp/confluent.xxxx/connect/logs/connect.log) will output the successfully installed plugin, which users can use to determine whether the plugin is installed successfully. +一定要先安装插件再启动 Confluent, 否则会出现找不到类的错误。Kafka Connect 的日志(默认路径: /tmp/confluent.xxxx/connect/logs/connect.log)中会输出成功安装的插件,据此可判断插件是否安装成功。 ::: :::tip -If a component fails to start, try clearing the data and restarting. The data directory will be printed to the console at startup, e.g.: +若某组件启动失败,可尝试清空数据,重新启动。数据目录在启动时将被打印到控制台,比如 : -```title="Console output log" {1} +```title="控制台输出日志" {1} Using CONFLUENT_CURRENT: /tmp/confluent.106668 Starting ZooKeeper ZooKeeper is [UP] @@ -120,18 +122,18 @@ Starting Control Center Control Center is [UP] ``` -To clear data, execute `rm -rf /tmp/confluent.106668`. +清空数据可执行 `rm -rf /tmp/confluent.106668`。 ::: -## The use of TDengine Sink Connector +## TDengine Sink Connector 的使用 -The role of the TDengine Sink Connector is to synchronize the data of the specified topic to TDengine. Users do not need to create databases and super tables in advance. The name of the target database can be specified manually (see the configuration parameter connection.database), or it can be generated according to specific rules (see the configuration parameter connection.database.prefix). +TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。 -TDengine Sink Connector internally uses TDengine [modeless write interface](/reference/connector/cpp#modeless write-api) to write data to TDengine, currently supports data in three formats: [InfluxDB line protocol format](/develop /insert-data/influxdb-line), [OpenTSDB Telnet protocol format](/develop/insert-data/opentsdb-telnet), and [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json). +TDengine Sink Connector 内部使用 TDengine [无模式写入接口](/reference/connector/cpp#无模式写入-api)写数据到 TDengine,目前支持三种格式的数据:[InfluxDB 行协议格式](/develop/insert-data/influxdb-line)、 [OpenTSDB Telnet 协议格式](/develop/insert-data/opentsdb-telnet) 和 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json)。 -The following example synchronizes the data of the topic meters to the target database power. The data format is the InfluxDB Line protocol format. +下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。 -### Add configuration file +### 添加配置文件 ``` mkdir ~/test @@ -139,7 +141,7 @@ cd ~/test vi sink-demo.properties ``` -sink-demo.properties' content is following: +sink-demo.properties 内容如下: ```ini title="sink-demo.properties" name=tdengine-sink-demo @@ -155,18 +157,18 @@ key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ``` -Key configuration instructions: +关键配置说明: -1. `topics=meters` and `connection.database=power` means to subscribe to the data of the topic meters and write to the database power. -2. `db.schemaless=line` means the data in the InfluxDB Line protocol format. +1. `topics=meters` 和 `connection.database=power`, 表示订阅主题 meters 的数据,并写入数据库 power。 +2. `db.schemaless=line`, 表示使用 InfluxDB Line 协议格式的数据。 -### Create Connector instance +### 创建 Connector 实例 -```` +``` confluent local services connect connector load TDengineSinkConnector --config ./sink-demo.properties -```` +``` -If the above command is executed successfully, the output is as follows: +若以上命令执行成功,则有如下输出: ```json { @@ -189,9 +191,9 @@ If the above command is executed successfully, the output is as follows: } ``` -### Write test data +### 写入测试数据 -Prepare text file as test data, its content is following: +准备测试数据的文本文件,内容如下: ```txt title="test-data.txt" meters,location=Beijing.Haidian,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000 @@ -200,19 +202,19 @@ meters,location=Beijing.Haidian,groupid=3 current=10.8,voltage=223,phase=0.29 16 meters,location=Beijing.Haidian,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000 ``` -Use kafka-console-producer to write test data to the topic `meters`. +使用 kafka-console-producer 向主题 meters 添加测试数据。 ``` cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic meters ``` :::note -TDengine Sink Connector will automatically create the database if the target database does not exist. The time precision used to create the database automatically is nanoseconds, which requires that the timestamp precision of the written data is also nanoseconds. An exception will be thrown if the timestamp precision of the written data is not nanoseconds. +如果目标数据库 power 不存在,那么 TDengine Sink Connector 会自动创建数据库。自动创建数据库使用的时间精度为纳秒,这就要求写入数据的时间戳精度也是纳秒。如果写入数据的时间戳精度不是纳秒,将会抛异常。 ::: -### Verify that the sync was successful +### 验证同步是否成功 -Use the TDengine CLI to verify that the sync was successful. +使用 TDengine CLI 验证同步是否成功。 ``` taos> use power; @@ -228,23 +230,23 @@ taos> select * from meters; Query OK, 4 row(s) in set (0.004208s) ``` -If you see the above data, the synchronization is successful. If not, check the logs of Kafka Connect. For detailed description of configuration parameters, see [Configuration Reference](#Configuration Reference). +若看到了以上数据,则说明同步成功。若没有,请检查 Kafka Connect 的日志。配置参数的详细说明见[配置参考](#配置参考)。 -## The use of TDengine Source Connector +## TDengine Source Connector 的使用 -The role of the TDengine Source Connector is to push all the data of a specific TDengine database after a particular time to Kafka. The implementation principle of TDengine Source Connector is to first pull historical data in batches and then synchronize incremental data with the strategy of the regular query. At the same time, the changes in the table will be monitored, and the newly added table can be automatically synchronized. If Kafka Connect is restarted, synchronization will resume where it left off. +TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是,先分批拉取历史数据,再用定时查询的策略同步增量数据。同时会监控表的变化,可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。 -TDengine Source Connector will convert the data in TDengine data table into [InfluxDB Line protocol format](/develop/insert-data/influxdb-line/) or [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json ) and then write to Kafka. +TDengine Source Connector 会将 TDengine 数据表中的数据转换成 [InfluxDB Line 协议格式](/develop/insert-data/influxdb-line/) 或 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json), 然后写入 Kafka。 -The following sample program synchronizes the data in the database test to the topic tdengine-source-test. +下面的示例程序同步数据库 test 中的数据到主题 tdengine-source-test。 -### Add configuration file +### 添加配置文件 ``` vi source-demo.properties ``` -Input following content: +输入以下内容: ```ini title="source-demo.properties" name=TDengineSourceConnector @@ -264,9 +266,9 @@ key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ``` -### Prepare test data +### 准备测试数据 -Prepare SQL script file to generate test data +准备生成测试数据的 SQL 文件。 ```sql title="prepare-source-data.sql" DROP DATABASE IF EXISTS test; @@ -276,101 +278,101 @@ CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAG INSERT INTO d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) d1002 USING meters TAGS(Beijing.Chaoyang, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000); ``` -Use TDengine CLI to execute SQL script +使用 TDengine CLI, 执行 SQL 文件。 ``` taos -f prepare-sorce-data.sql ``` -### Create Connector instance +### 创建 Connector 实例 -```` +``` confluent local services connect connector load TDengineSourceConnector --config source-demo.properties -```` +``` -### View topic data +### 查看 topic 数据 -Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. +使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。 -```` +``` kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test -```` +``` -output: +输出: -```` +``` ...... meters,location="beijing.chaoyang",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000 meters,location="beijing.chaoyang",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000 ...... -```` +``` -All historical data is displayed. Switch to the TDengine CLI and insert two new pieces of data: +此时会显示所有历史数据。切换到 TDengine CLI, 插入两条新的数据: -```` +``` USE test; INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38); INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22); -```` +``` -Switch back to kafka-console-consumer, and the command line window has printed out the two pieces of data just inserted. +再切换回 kafka-console-consumer, 此时命令行窗口已经打印出刚插入的 2 条数据。 -### unload plugin +### unload 插件 -After testing, use the unload command to stop the loaded connector. +测试完毕之后,用 unload 命令停止已加载的 connector。 -View currently active connectors: +查看当前活跃的 connector: -```` +``` confluent local services connect connector status -```` +``` -You should now have two active connectors if you followed the previous steps. Use the following command to unload: +如果按照前述操作,此时应有两个活跃的 connector。使用下面的命令 unload: -```` +``` confluent local services connect connector unload TDengineSourceConnector confluent local services connect connector unload TDengineSourceConnector -```` +``` -## Configuration reference +## 配置参考 -### General configuration +### 通用配置 -The following configuration items apply to TDengine Sink Connector and TDengine Source Connector. +以下配置项对 TDengine Sink Connector 和 TDengine Source Connector 均适用。 -1. `name`: The name of the connector. -2. `connector.class`: The full class name of the connector, for example: com.taosdata.kafka.connect.sink.TDengineSinkConnector. -3. `tasks.max`: The maximum number of tasks, the default is 1. -4. `topics`: A list of topics to be synchronized, separated by commas, such as `topic1,topic2`. -5. `connection.url`: TDengine JDBC connection string, such as `jdbc:TAOS://127.0.0.1:6030`. -6. `connection.user`: TDengine username, default root. -7. `connection.password`: TDengine user password, default taosdata. -8. `connection.attempts` : The maximum number of connection attempts. Default 3. -9. `connection.backoff.ms`: The retry interval for connection creation failure, the unit is ms. Default is 5000. +1. `name`: connector 名称。 +2. `connector.class`: connector 的完整类名, 如: com.taosdata.kafka.connect.sink.TDengineSinkConnector。 +3. `tasks.max`: 最大任务数, 默认 1。 +4. `topics`: 需要同步的 topic 列表, 多个用逗号分隔, 如 `topic1,topic2`。 +5. `connection.url`: TDengine JDBC 连接字符串, 如 `jdbc:TAOS://127.0.0.1:6030`。 +6. `connection.user`: TDengine 用户名, 默认 root。 +7. `connection.password` :TDengine 用户密码, 默认 taosdata。 +8. `connection.attempts` :最大尝试连接次数。默认 3。 +9. `connection.backoff.ms` : 创建连接失败重试时间隔时间,单位为 ms。 默认 5000。 -### TDengine Sink Connector specific configuration +### TDengine Sink Connector 特有的配置 -1. `connection.database`: The name of the target database. If the specified database does not exist, it will be created automatically. The time precision used for automatic library building is nanoseconds. The default value is null. When it is NULL, refer to the description of the `connection.database.prefix` parameter for the naming rules of the target database -2. `connection.database.prefix`: When `connection.database` is null, the prefix of the target database. Can contain placeholder '${topic}'. For example, kafka_${topic}, for topic 'orders' will be written to database 'kafka_orders'. Default null. When null, the name of the target database is the same as the name of the topic. -3. `batch.size`: Write the number of records in each batch in batches. When the data received by the sink connector at one time is larger than this value, it will be written in some batches. -4. `max.retries`: The maximum number of retries when an error occurs. Defaults to 1. -5. `retry.backoff.ms`: The time interval for retry when sending an error. The unit is milliseconds. The default is 3000. -6. `db.schemaless`: Data format, could be one of `line`, `json`, and `telnet`. Represent InfluxDB line protocol format, OpenTSDB JSON format, and OpenTSDB Telnet line protocol format. +1. `connection.database`: 目标数据库名。如果指定的数据库不存在会则自动创建。自动建库使用的时间精度为纳秒。默认值为 null。为 null 时目标数据库命名规则参考 `connection.database.prefix` 参数的说明 +2. `connection.database.prefix`: 当 connection.database 为 null 时, 目标数据库的前缀。可以包含占位符 '${topic}'。 比如 kafka_${topic}, 对于主题 'orders' 将写入数据库 'kafka_orders'。 默认 null。当为 null 时,目标数据库的名字和主题的名字是一致的。 +3. `batch.size`: 分批写入每批记录数。当 Sink Connector 一次接收到的数据大于这个值时将分批写入。 +4. `max.retries`: 发生错误时的最大重试次数。默认为 1。 +5. `retry.backoff.ms`: 发送错误时重试的时间间隔。单位毫秒,默认 3000。 +6. `db.schemaless`: 数据格式,必须指定为: line、json、telnet 中的一个。分别代表 InfluxDB 行协议格式、 OpenTSDB JSON 格式、 OpenTSDB Telnet 行协议格式。 -### TDengine Source Connector specific configuration +### TDengine Source Connector 特有的配置 -1. `connection.database`: source database name, no default value. -2. `topic.prefix`: topic name prefix after data is imported into kafka. Use `topic.prefix` + `connection.database` name as the full topic name. Defaults to the empty string "". -3. `timestamp.initial`: Data synchronization start time. The format is 'yyyy-MM-dd HH:mm:ss'. Default "1970-01-01 00:00:00". -4. `poll.interval.ms`: Pull data interval, the unit is ms. Default is 1000. -5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database. Default is 100. -6. `out.format`: The data format. The value could be line or json. The line represents the InfluxDB Line protocol format, and json represents the OpenTSDB JSON format. Default is `line`. +1. `connection.database`: 源数据库名称,无缺省值。 +2. `topic.prefix`: 数据导入 kafka 后 topic 名称前缀。 使用 `topic.prefix` + `connection.database` 名称作为完整 topic 名。默认为空字符串 ""。 +3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认 "1970-01-01 00:00:00"。 +4. `poll.interval.ms`: 拉取数据间隔,单位为 ms。默认 1000。 +5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。 +6. `out.format`: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认 line。 -## feedback +## 问题反馈 https://github.com/taosdata/kafka-connect-tdengine/issues -## Reference +## 参考 1. https://www.confluent.io/what-is-apache-kafka 2. https://developer.confluent.io/learn-kafka/kafka-connect/intro diff --git a/docs-en/20-third-party/11-kafka.md b/docs-en/20-third-party/11-kafka.md index 54a378c63380aacd763459ddc5b1acc012c88a06..83d086308d0a593cc90e2b0a0c0945a52ce259ea 100644 --- a/docs-en/20-third-party/11-kafka.md +++ b/docs-en/20-third-party/11-kafka.md @@ -1,56 +1,54 @@ --- sidebar_label: Kafka -title: TDengine Kafka Connector 使用教程 +title: TDengine Kafka Connector Tutorial --- -TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDengine Sink Connector。用户只需提供简单的配置文件,就可以将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine, 或将 TDengine 中指定数据库的数据(批量或实时)同步到 Kafka。 +TDengine Kafka Connector contains two plugins: TDengine Source Connector and TDengine Sink Connector. Users only need to provide a simple configuration file to synchronize the data of the specified topic in Kafka (batch or real-time) to TDengine or synchronize the data (batch or real-time) of the specified database in TDengine to Kafka. -## 什么是 Kafka Connect? +## What is Kafka Connect? -Kafka Connect 是 Apache Kafka 的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。 +Kafka Connect is a component of Apache Kafka that enables other systems, such as databases, cloud services, file systems, etc., to connect to Kafka easily. Data can flow from other software to Kafka via Kafka Connect and Kafka to other systems via Kafka Connect. Plugins that read data from other software are called Source Connectors, and plugins that write data to other software are called Sink Connectors. Neither Source Connector nor Sink Connector will directly connect to Kafka Broker, and Source Connector transfers data to Kafka Connect. Sink Connector receives data from Kafka Connect. ![](kafka/Kafka_Connect.png) -TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送给 Kafka Connect。TDengine Sink Connector 用于 从 Kafka Connect 接收数据并写入 TDengine。 +TDengine Source Connector is used to read data from TDengine in real-time and send it to Kafka Connect. Users can use The TDengine Sink Connector to receive data from Kafka Connect and write it to TDengine. ![](kafka/streaming-integration-with-kafka-connect.png) -## 什么是 Confluent? +## What is Confluent? -Confluent 在 Kafka 的基础上增加很多扩展功能。包括: +Confluent adds many extensions to Kafka. include: 1. Schema Registry -2. REST 代理 -3. 非 Java 客户端 -4. 很多打包好的 Kafka Connect 插件 -5. 管理和监控 Kafka 的 GUI —— Confluent 控制中心 +2. REST Proxy +3. Non-Java Clients +4. Many packaged Kafka Connect plugins +5. GUI for managing and monitoring Kafka - Confluent Control Center -这些扩展功能有的包含在社区版本的 Confluent 中,有的只有企业版能用。 +Some of these extensions are available in the community version of Confluent. Some are only available in the enterprise version. ![](kafka/confluentPlatform.png) -Confluent 企业版提供了 `confluent` 命令行工具管理各个组件。 +Confluent Enterprise Edition provides the `confluent` command-line tool to manage various components. -## 前置条件 +## Prerequisites -运行本教程中示例的前提条件。 +1. Linux operating system +2. Java 8 and Maven installed +3. Git is installed +4. TDengine is installed and started. If not, please refer to [Installation and Uninstallation](/operation/pkg-install) -1. Linux 操作系统 -2. 已安装 Java 8 和 Maven -3. 已安装 Git -4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](/operation/pkg-install) +## Install Confluent -## 安装 Confluent +Confluent provides two installation methods: Docker and binary packages. This article only introduces binary package installation. -Confluent 提供了 Docker 和二进制包两种安装方式。本文仅介绍二进制包方式安装。 +Execute in any directory: -在任意目录下执行: - -``` +```` curl -O http://packages.confluent.io/archive/7.1/confluent-7.1.1.tar.gz tar xzf confluent-7.1.1.tar.gz -C /opt/test -``` +```` -然后需要把 `$CONFLUENT_HOME/bin` 目录加入 PATH。 +Then you need to add the `$CONFLUENT_HOME/bin` directory to the PATH. ```title=".profile" export CONFLUENT_HOME=/opt/confluent-7.1.1 @@ -58,9 +56,9 @@ PATH=$CONFLUENT_HOME/bin export PATH ``` -以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile) +Users can append the above script to the current user's profile file (~/.profile or ~/.bash_profile) -安装完成之后,可以输入`confluent version`做简单验证: +After the installation is complete, you can enter `confluent version` for simple verification: ``` # confluent version @@ -73,9 +71,9 @@ Go Version: go1.17.6 (linux/amd64) Development: false ``` -## 安装 TDengine Connector 插件 +## Install TDengine Connector plugin -### 从源码安装 +### Install from source code ``` git clone https://github.com:taosdata/kafka-connect-tdengine.git @@ -84,27 +82,27 @@ mvn clean package unzip -d $CONFLUENT_HOME/share/confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0.zip ``` -以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在 `target/components/packages/` 目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件 `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties` 中。默认的路径为 `$CONFLUENT_HOME/share/confluent-hub-components/`。 +The above script first clones the project source code and then compiles and packages it with Maven. After the package is complete, the zip package of the plugin is generated in the `target/components/packages/` directory. Unzip this zip package to the path where the plugin is installed. The path to install the plugin is in the configuration file `$CONFLUENT_HOME/etc/kafka/connect-standalone.properties`. The default path is `$CONFLUENT_HOME/share/confluent-hub-components/`. -### 用 confluent-hub 安装 +### Install with confluent-hub -[Confluent Hub](https://www.confluent.io/hub) 提供下载 Kafka Connect 插件的服务。在 TDengine Kafka Connector 发布到 Confluent Hub 后可以使用命令工具 `confluent-hub` 安装。 -**TDengine Kafka Connector 目前没有正式发布,不能用这种方式安装**。 +[Confluent Hub](https://www.confluent.io/hub) provides a service to download Kafka Connect plugins. After TDengine Kafka Connector is published to Confluent Hub, it can be installed using the command tool `confluent-hub`. +**TDengine Kafka Connector is currently not officially released and cannot be installed in this way**. -## 启动 Confluent +## Start Confluent ``` confluent local services start ``` :::note -一定要先安装插件再启动 Confluent, 否则会出现找不到类的错误。Kafka Connect 的日志(默认路径: /tmp/confluent.xxxx/connect/logs/connect.log)中会输出成功安装的插件,据此可判断插件是否安装成功。 +Be sure to install the plugin before starting Confluent. Otherwise, there will be a class not found error. The log of Kafka Connect (default path: /tmp/confluent.xxxx/connect/logs/connect.log) will output the successfully installed plugin, which users can use to determine whether the plugin is installed successfully. ::: :::tip -若某组件启动失败,可尝试清空数据,重新启动。数据目录在启动时将被打印到控制台,比如 : +If a component fails to start, try clearing the data and restarting. The data directory will be printed to the console at startup, e.g.: -```title="控制台输出日志" {1} +```title="Console output log" {1} Using CONFLUENT_CURRENT: /tmp/confluent.106668 Starting ZooKeeper ZooKeeper is [UP] @@ -122,18 +120,18 @@ Starting Control Center Control Center is [UP] ``` -清空数据可执行 `rm -rf /tmp/confluent.106668`。 +To clear data, execute `rm -rf /tmp/confluent.106668`. ::: -## TDengine Sink Connector 的使用 +## The use of TDengine Sink Connector -TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。 +The role of the TDengine Sink Connector is to synchronize the data of the specified topic to TDengine. Users do not need to create databases and super tables in advance. The name of the target database can be specified manually (see the configuration parameter connection.database), or it can be generated according to specific rules (see the configuration parameter connection.database.prefix). -TDengine Sink Connector 内部使用 TDengine [无模式写入接口](/reference/connector/cpp#无模式写入-api)写数据到 TDengine,目前支持三种格式的数据:[InfluxDB 行协议格式](/develop/insert-data/influxdb-line)、 [OpenTSDB Telnet 协议格式](/develop/insert-data/opentsdb-telnet) 和 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json)。 +TDengine Sink Connector internally uses TDengine [modeless write interface](/reference/connector/cpp#modeless write-api) to write data to TDengine, currently supports data in three formats: [InfluxDB line protocol format](/develop /insert-data/influxdb-line), [OpenTSDB Telnet protocol format](/develop/insert-data/opentsdb-telnet), and [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json). -下面的示例将主题 meters 的数据,同步到目标数据库 power。数据格式为 InfluxDB Line 协议格式。 +The following example synchronizes the data of the topic meters to the target database power. The data format is the InfluxDB Line protocol format. -### 添加配置文件 +### Add configuration file ``` mkdir ~/test @@ -141,7 +139,7 @@ cd ~/test vi sink-demo.properties ``` -sink-demo.properties 内容如下: +sink-demo.properties' content is following: ```ini title="sink-demo.properties" name=tdengine-sink-demo @@ -157,18 +155,18 @@ key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ``` -关键配置说明: +Key configuration instructions: -1. `topics=meters` 和 `connection.database=power`, 表示订阅主题 meters 的数据,并写入数据库 power。 -2. `db.schemaless=line`, 表示使用 InfluxDB Line 协议格式的数据。 +1. `topics=meters` and `connection.database=power` means to subscribe to the data of the topic meters and write to the database power. +2. `db.schemaless=line` means the data in the InfluxDB Line protocol format. -### 创建 Connector 实例 +### Create Connector instance -``` +```` confluent local services connect connector load TDengineSinkConnector --config ./sink-demo.properties -``` +```` -若以上命令执行成功,则有如下输出: +If the above command is executed successfully, the output is as follows: ```json { @@ -191,9 +189,9 @@ confluent local services connect connector load TDengineSinkConnector --config . } ``` -### 写入测试数据 +### Write test data -准备测试数据的文本文件,内容如下: +Prepare text file as test data, its content is following: ```txt title="test-data.txt" meters,location=Beijing.Haidian,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000 @@ -202,19 +200,19 @@ meters,location=Beijing.Haidian,groupid=3 current=10.8,voltage=223,phase=0.29 16 meters,location=Beijing.Haidian,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000 ``` -使用 kafka-console-producer 向主题 meters 添加测试数据。 +Use kafka-console-producer to write test data to the topic `meters`. ``` cat test-data.txt | kafka-console-producer --broker-list localhost:9092 --topic meters ``` :::note -如果目标数据库 power 不存在,那么 TDengine Sink Connector 会自动创建数据库。自动创建数据库使用的时间精度为纳秒,这就要求写入数据的时间戳精度也是纳秒。如果写入数据的时间戳精度不是纳秒,将会抛异常。 +TDengine Sink Connector will automatically create the database if the target database does not exist. The time precision used to create the database automatically is nanoseconds, which requires that the timestamp precision of the written data is also nanoseconds. An exception will be thrown if the timestamp precision of the written data is not nanoseconds. ::: -### 验证同步是否成功 +### Verify that the sync was successful -使用 TDengine CLI 验证同步是否成功。 +Use the TDengine CLI to verify that the sync was successful. ``` taos> use power; @@ -230,23 +228,23 @@ taos> select * from meters; Query OK, 4 row(s) in set (0.004208s) ``` -若看到了以上数据,则说明同步成功。若没有,请检查 Kafka Connect 的日志。配置参数的详细说明见[配置参考](#配置参考)。 +If you see the above data, the synchronization is successful. If not, check the logs of Kafka Connect. For detailed description of configuration parameters, see [Configuration Reference](#Configuration Reference). -## TDengine Source Connector 的使用 +## The use of TDengine Source Connector -TDengine Source Connector 的作用是将 TDengine 某个数据库某一时刻之后的数据全部推送到 Kafka。TDengine Source Connector 的实现原理是,先分批拉取历史数据,再用定时查询的策略同步增量数据。同时会监控表的变化,可以自动同步新增的表。如果重启 Kafka Connect, 会从上次中断的位置继续同步。 +The role of the TDengine Source Connector is to push all the data of a specific TDengine database after a particular time to Kafka. The implementation principle of TDengine Source Connector is to first pull historical data in batches and then synchronize incremental data with the strategy of the regular query. At the same time, the changes in the table will be monitored, and the newly added table can be automatically synchronized. If Kafka Connect is restarted, synchronization will resume where it left off. -TDengine Source Connector 会将 TDengine 数据表中的数据转换成 [InfluxDB Line 协议格式](/develop/insert-data/influxdb-line/) 或 [OpenTSDB JSON 协议格式](/develop/insert-data/opentsdb-json), 然后写入 Kafka。 +TDengine Source Connector will convert the data in TDengine data table into [InfluxDB Line protocol format](/develop/insert-data/influxdb-line/) or [OpenTSDB JSON protocol format](/develop/insert-data/opentsdb-json ) and then write to Kafka. -下面的示例程序同步数据库 test 中的数据到主题 tdengine-source-test。 +The following sample program synchronizes the data in the database test to the topic tdengine-source-test. -### 添加配置文件 +### Add configuration file ``` vi source-demo.properties ``` -输入以下内容: +Input following content: ```ini title="source-demo.properties" name=TDengineSourceConnector @@ -266,9 +264,9 @@ key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ``` -### 准备测试数据 +### Prepare test data -准备生成测试数据的 SQL 文件。 +Prepare SQL script file to generate test data ```sql title="prepare-source-data.sql" DROP DATABASE IF EXISTS test; @@ -278,101 +276,101 @@ CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAG INSERT INTO d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) d1001 USING meters TAGS(Beijing.Chaoyang, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) d1002 USING meters TAGS(Beijing.Chaoyang, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) d1003 USING meters TAGS(Beijing.Haidian, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) d1004 USING meters TAGS(Beijing.Haidian, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000); ``` -使用 TDengine CLI, 执行 SQL 文件。 +Use TDengine CLI to execute SQL script ``` taos -f prepare-sorce-data.sql ``` -### 创建 Connector 实例 +### Create Connector instance -``` +```` confluent local services connect connector load TDengineSourceConnector --config source-demo.properties -``` +```` -### 查看 topic 数据 +### View topic data -使用 kafka-console-consumer 命令行工具监控主题 tdengine-source-test 中的数据。一开始会输出所有历史数据, 往 TDengine 插入两条新的数据之后,kafka-console-consumer 也立即输出了新增的两条数据。 +Use the kafka-console-consumer command-line tool to monitor data in the topic tdengine-source-test. In the beginning, all historical data will be output. After inserting two new data into TDengine, kafka-console-consumer immediately outputs the two new data. -``` +```` kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source-test -``` +```` -输出: +output: -``` +```` ...... meters,location="beijing.chaoyang",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000 meters,location="beijing.chaoyang",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000 ...... -``` +```` -此时会显示所有历史数据。切换到 TDengine CLI, 插入两条新的数据: +All historical data is displayed. Switch to the TDengine CLI and insert two new pieces of data: -``` +```` USE test; INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38); INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22); -``` +```` -再切换回 kafka-console-consumer, 此时命令行窗口已经打印出刚插入的 2 条数据。 +Switch back to kafka-console-consumer, and the command line window has printed out the two pieces of data just inserted. -### unload 插件 +### unload plugin -测试完毕之后,用 unload 命令停止已加载的 connector。 +After testing, use the unload command to stop the loaded connector. -查看当前活跃的 connector: +View currently active connectors: -``` +```` confluent local services connect connector status -``` +```` -如果按照前述操作,此时应有两个活跃的 connector。使用下面的命令 unload: +You should now have two active connectors if you followed the previous steps. Use the following command to unload: -``` +```` confluent local services connect connector unload TDengineSourceConnector confluent local services connect connector unload TDengineSourceConnector -``` +```` -## 配置参考 +## Configuration reference -### 通用配置 +### General configuration -以下配置项对 TDengine Sink Connector 和 TDengine Source Connector 均适用。 +The following configuration items apply to TDengine Sink Connector and TDengine Source Connector. -1. `name`: connector 名称。 -2. `connector.class`: connector 的完整类名, 如: com.taosdata.kafka.connect.sink.TDengineSinkConnector。 -3. `tasks.max`: 最大任务数, 默认 1。 -4. `topics`: 需要同步的 topic 列表, 多个用逗号分隔, 如 `topic1,topic2`。 -5. `connection.url`: TDengine JDBC 连接字符串, 如 `jdbc:TAOS://127.0.0.1:6030`。 -6. `connection.user`: TDengine 用户名, 默认 root。 -7. `connection.password` :TDengine 用户密码, 默认 taosdata。 -8. `connection.attempts` :最大尝试连接次数。默认 3。 -9. `connection.backoff.ms` : 创建连接失败重试时间隔时间,单位为 ms。 默认 5000。 +1. `name`: The name of the connector. +2. `connector.class`: The full class name of the connector, for example: com.taosdata.kafka.connect.sink.TDengineSinkConnector. +3. `tasks.max`: The maximum number of tasks, the default is 1. +4. `topics`: A list of topics to be synchronized, separated by commas, such as `topic1,topic2`. +5. `connection.url`: TDengine JDBC connection string, such as `jdbc:TAOS://127.0.0.1:6030`. +6. `connection.user`: TDengine username, default root. +7. `connection.password`: TDengine user password, default taosdata. +8. `connection.attempts` : The maximum number of connection attempts. Default 3. +9. `connection.backoff.ms`: The retry interval for connection creation failure, the unit is ms. Default is 5000. -### TDengine Sink Connector 特有的配置 +### TDengine Sink Connector specific configuration -1. `connection.database`: 目标数据库名。如果指定的数据库不存在会则自动创建。自动建库使用的时间精度为纳秒。默认值为 null。为 null 时目标数据库命名规则参考 `connection.database.prefix` 参数的说明 -2. `connection.database.prefix`: 当 connection.database 为 null 时, 目标数据库的前缀。可以包含占位符 '${topic}'。 比如 kafka_${topic}, 对于主题 'orders' 将写入数据库 'kafka_orders'。 默认 null。当为 null 时,目标数据库的名字和主题的名字是一致的。 -3. `batch.size`: 分批写入每批记录数。当 Sink Connector 一次接收到的数据大于这个值时将分批写入。 -4. `max.retries`: 发生错误时的最大重试次数。默认为 1。 -5. `retry.backoff.ms`: 发送错误时重试的时间间隔。单位毫秒,默认 3000。 -6. `db.schemaless`: 数据格式,必须指定为: line、json、telnet 中的一个。分别代表 InfluxDB 行协议格式、 OpenTSDB JSON 格式、 OpenTSDB Telnet 行协议格式。 +1. `connection.database`: The name of the target database. If the specified database does not exist, it will be created automatically. The time precision used for automatic library building is nanoseconds. The default value is null. When it is NULL, refer to the description of the `connection.database.prefix` parameter for the naming rules of the target database +2. `connection.database.prefix`: When `connection.database` is null, the prefix of the target database. Can contain placeholder '${topic}'. For example, kafka_${topic}, for topic 'orders' will be written to database 'kafka_orders'. Default null. When null, the name of the target database is the same as the name of the topic. +3. `batch.size`: Write the number of records in each batch in batches. When the data received by the sink connector at one time is larger than this value, it will be written in some batches. +4. `max.retries`: The maximum number of retries when an error occurs. Defaults to 1. +5. `retry.backoff.ms`: The time interval for retry when sending an error. The unit is milliseconds. The default is 3000. +6. `db.schemaless`: Data format, could be one of `line`, `json`, and `telnet`. Represent InfluxDB line protocol format, OpenTSDB JSON format, and OpenTSDB Telnet line protocol format. -### TDengine Source Connector 特有的配置 +### TDengine Source Connector specific configuration -1. `connection.database`: 源数据库名称,无缺省值。 -2. `topic.prefix`: 数据导入 kafka 后 topic 名称前缀。 使用 `topic.prefix` + `connection.database` 名称作为完整 topic 名。默认为空字符串 ""。 -3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认 "1970-01-01 00:00:00"。 -4. `poll.interval.ms`: 拉取数据间隔,单位为 ms。默认 1000。 -5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。 -6. `out.format`: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认 line。 +1. `connection.database`: source database name, no default value. +2. `topic.prefix`: topic name prefix after data is imported into kafka. Use `topic.prefix` + `connection.database` name as the full topic name. Defaults to the empty string "". +3. `timestamp.initial`: Data synchronization start time. The format is 'yyyy-MM-dd HH:mm:ss'. Default "1970-01-01 00:00:00". +4. `poll.interval.ms`: Pull data interval, the unit is ms. Default is 1000. +5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database. Default is 100. +6. `out.format`: The data format. The value could be line or json. The line represents the InfluxDB Line protocol format, and json represents the OpenTSDB JSON format. Default is `line`. -## 问题反馈 +## feedback https://github.com/taosdata/kafka-connect-tdengine/issues -## 参考 +## Reference 1. https://www.confluent.io/what-is-apache-kafka 2. https://developer.confluent.io/learn-kafka/kafka-connect/intro