Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3d35c7d6
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
3d35c7d6
编写于
5月 31, 2022
作者:
B
Bo Ding
提交者:
GitHub
5月 31, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
docs: enhance kafka connector tutorial (#13273)
上级
a3b148b1
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
85 addition
and
16 deletion
+85
-16
docs-cn/20-third-party/11-kafka.md
docs-cn/20-third-party/11-kafka.md
+85
-16
未找到文件。
docs-cn/20-third-party/11-kafka.md
浏览文件 @
3d35c7d6
...
...
@@ -7,7 +7,7 @@ TDengine Kafka Connector 包含两个插件: TDengine Source Connector 和 TDeng
## 什么是 Kafka Connect?
Kafka Connect 是
Apache Kafka
的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。
Kafka Connect 是
[
Apache Kafka
](
https://kafka.apache.org/
)
的一个组件,用于使其它系统,比如数据库、云服务、文件系统等能方便地连接到 Kafka。数据既可以通过 Kafka Connect 从其它系统流向 Kafka, 也可以通过 Kafka Connect 从 Kafka 流向其它系统。从其它系统读数据的插件称为 Source Connector, 写数据到其它系统的插件称为 Sink Connector。Source Connector 和 Sink Connector 都不会直接连接 Kafka Broker,Source Connector 把数据转交给 Kafka Connect。Sink Connector 从 Kafka Connect 接收数据。

...
...
@@ -17,7 +17,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
## 什么是 Confluent?
Confluent
在 Kafka 的基础上增加很多扩展功能。包括:
[
Confluent
](
https://www.confluent.io/
)
在 Kafka 的基础上增加很多扩展功能。包括:
1.
Schema Registry
2.
REST 代理
...
...
@@ -81,10 +81,10 @@ Development: false
git clone https://github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d $CONFLUENT_HOME/share/
confluent-hub-components/ target/components/packages/taosdata-kafka-connect-tdengine-0.1.0
.zip
unzip -d $CONFLUENT_HOME/share/
java/ target/components/packages/taosdata-kafka-connect-tdengine-*
.zip
```
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在
`target/components/packages/`
目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件
`$CONFLUENT_HOME/etc/kafka/connect-standalone.properties`
中。
默认的路径为
`$CONFLUENT_HOME/share/confluent-hub-components
/`
。
以上脚本先 clone 项目源码,然后用 Maven 编译打包。打包完成后在
`target/components/packages/`
目录生成了插件的 zip 包。把这个 zip 包解压到安装插件的路径即可。安装插件的路径在配置文件
`$CONFLUENT_HOME/etc/kafka/connect-standalone.properties`
中。
上面的示例中使用了内置的插件路径:
`$CONFLUENT_HOME/share/java
/`
。
### 用 confluent-hub 安装
...
...
@@ -98,7 +98,7 @@ confluent local services start
```
:::note
一定要先安装插件再启动 Confluent, 否则
会出现找不到类的错误。Kafka Connect 的日志(默认路径: /tmp/confluent.xxxx/connect/logs/connect.log)中会输出成功安装的插件,据此可判断插件是否安装成功
。
一定要先安装插件再启动 Confluent, 否则
加载插件会失败
。
:::
:::tip
...
...
@@ -125,6 +125,61 @@ Control Center is [UP]
清空数据可执行
`rm -rf /tmp/confluent.106668`
。
:::
### 验证各个组件是否启动成功
输入命令:
```
confluent local services status
```
如果各组件都启动成功,会得到如下输出:
```
Connect is [UP]
Control Center is [UP]
Kafka is [UP]
Kafka REST is [UP]
ksqlDB Server is [UP]
Schema Registry is [UP]
ZooKeeper is [UP]
```
### 验证插件是否安装成功
在 Kafka Connect 组件完全启动后,可用以下命令列出成功加载的插件:
```
confluent local services connect plugin list
```
如果成功安装,会输出如下:
```
txt {4,9}
Available Connect Plugins:
[
{
"class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector",
"type": "sink",
"version": "1.0.0"
},
{
"class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
"type": "source",
"version": "1.0.0"
},
......
```
如果插件安装失败,请检查 Kafka Connect 的启动日志是否有异常信息,用以下命令输出日志路径:
```
echo `cat /tmp/confluent.current`/connect/connect.stdout
```
该命令的输出类似:
`/tmp/confluent.104086/connect/connect.stdout`
。
与日志文件
`connect.stdout`
同一目录,还有一个文件名为:
`connect.properties`
。在这个文件的末尾,可以看到最终生效的
`plugin.path`
, 它是一系列用逗号分割的路径。如果插件安装失败,很可能是因为实际的安装路径不包含在
`plugin.path`
中。
## TDengine Sink Connector 的使用
TDengine Sink Connector 的作用是同步指定 topic 的数据到 TDengine。用户无需提前创建数据库和超级表。可手动指定目标数据库的名字(见配置参数 connection.database), 也可按一定规则生成(见配置参数 connection.database.prefix)。
...
...
@@ -144,7 +199,7 @@ vi sink-demo.properties
sink-demo.properties 内容如下:
```
ini title="sink-demo.properties"
name=
tdengine-sink-demo
name=
TDengineSinkConnector
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
...
...
@@ -153,6 +208,7 @@ connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
data.precision=ns
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```
...
...
@@ -179,6 +235,7 @@ confluent local services connect connector load TDengineSinkConnector --config .
"connection.url"
:
"jdbc:TAOS://127.0.0.1:6030"
,
"connection.user"
:
"root"
,
"connector.class"
:
"com.taosdata.kafka.connect.sink.TDengineSinkConnector"
,
"data.precision"
:
"ns"
,
"db.schemaless"
:
"line"
,
"key.converter"
:
"org.apache.kafka.connect.storage.StringConverter"
,
"tasks.max"
:
"1"
,
...
...
@@ -223,10 +280,10 @@ Database changed.
taos> select * from meters;
ts | current | voltage | phase | groupid | location |
===============================================================================================================================================================
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles
|
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles
|
2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles
|
2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles
|
2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |
2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles |
2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles |
Query OK, 4 row(s) in set (0.004208s)
```
...
...
@@ -356,21 +413,33 @@ confluent local services connect connector unload TDengineSourceConnector
2.
`connection.database.prefix`
: 当 connection.database 为 null 时, 目标数据库的前缀。可以包含占位符 '${topic}'。 比如 kafka_${topic}, 对于主题 'orders' 将写入数据库 'kafka_orders'。 默认 null。当为 null 时,目标数据库的名字和主题的名字是一致的。
3.
`batch.size`
: 分批写入每批记录数。当 Sink Connector 一次接收到的数据大于这个值时将分批写入。
4.
`max.retries`
: 发生错误时的最大重试次数。默认为 1。
5.
`retry.backoff.ms`
: 发送错误时重试的时间间隔。单位毫秒,默认 3000。
6.
`db.schemaless`
: 数据格式,必须指定为: line、json、telnet 中的一个。分别代表 InfluxDB 行协议格式、 OpenTSDB JSON 格式、 OpenTSDB Telnet 行协议格式。
5.
`retry.backoff.ms`
: 发送错误时重试的时间间隔。单位毫秒,默认为 3000。
6.
`db.schemaless`
: 数据格式,可选值为:
1.
line :代表 InfluxDB 行协议格式
2.
json : 代表 OpenTSDB JSON 格式
3.
telnet :代表 OpenTSDB Telnet 行协议格式
7.
`data.precision`
: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
1.
ms : 表示毫秒
2.
us : 表示微秒
3.
ns : 表示纳秒。默认为纳秒。
### TDengine Source Connector 特有的配置
1.
`connection.database`
: 源数据库名称,无缺省值。
2.
`topic.prefix`
: 数据导入 kafka 后 topic 名称前缀。 使用
`topic.prefix`
+
`connection.database`
名称作为完整 topic 名。默认为空字符串 ""。
3.
`timestamp.initial`
: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认 "1970-01-01 00:00:00"。
4.
`poll.interval.ms`
: 拉取数据间隔,单位为 ms。默认 1000。
3.
`timestamp.initial`
: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss'。默认
为
"1970-01-01 00:00:00"。
4.
`poll.interval.ms`
: 拉取数据间隔,单位为 ms。默认
为
1000。
5.
`fetch.max.rows`
: 检索数据库时最大检索条数。 默认为 100。
6.
`out.format`
: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认 line。
6.
`out.format`
: 数据格式。取值 line 或 json。line 表示 InfluxDB Line 协议格式, json 表示 OpenTSDB JSON 格式。默认为 line。
## 其他说明
1.
插件的安装位置可以自定义,请参考官方文档:https://docs.confluent.io/home/connect/self-managed/install.html#install-connector-manually。
2.
本教程的示例程序使用了 Confluent 平台,但是 TDengine Kafka Connector 本身同样适用于独立安装的 Kafka, 且配置方法相同。关于如何在独立安装的 Kafka 环境使用 Kafka Connect 插件, 请参考官方文档: https://kafka.apache.org/documentation/#connect。
## 问题反馈
https://github.com/taosdata/kafka-connect-tdengine/issues
无论遇到任何问题,都欢迎在本项目的 Github 仓库反馈: https://github.com/taosdata/kafka-connect-tdengine/issues。
## 参考
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录