Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
957410a0
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
957410a0
编写于
5月 26, 2020
作者:
麦壳饼
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix for topic and clean up
上级
c2b148c1
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
15 addition
and
11 deletion
+15
-11
src/plugins/mqtt/inc/mqttInit.h
src/plugins/mqtt/inc/mqttInit.h
+0
-3
src/plugins/mqtt/src/mqttSystem.c
src/plugins/mqtt/src/mqttSystem.c
+15
-8
未找到文件。
src/plugins/mqtt/inc/mqttInit.h
浏览文件 @
957410a0
...
@@ -77,9 +77,6 @@ void* mqttClientRefresher(void* client);
...
@@ -77,9 +77,6 @@ void* mqttClientRefresher(void* client);
void
mqttCleanup
(
int
status
,
int
sockfd
,
pthread_t
*
client_daemon
);
void
mqttCleanup
(
int
status
,
int
sockfd
,
pthread_t
*
client_daemon
);
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
mqttQueryInsertCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
mqttQueryInsertCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
#define CLIENTID "taos"
#define TOPIC "+/+/+/" // path/<token>/<db name>/<table name>/
#define PAYLOAD "Hello World!"
#define QOS 1
#define QOS 1
#define TIMEOUT 10000L
#define TIMEOUT 10000L
...
...
src/plugins/mqtt/src/mqttSystem.c
浏览文件 @
957410a0
...
@@ -35,6 +35,7 @@ pthread_t client_daemon;
...
@@ -35,6 +35,7 @@ pthread_t client_daemon;
void
*
mqtt_conn
;
void
*
mqtt_conn
;
struct
reconnect_state_t
recnt_status
;
struct
reconnect_state_t
recnt_status
;
char
*
topicPath
;
char
*
topicPath
;
int
isStop
=
1
;
int32_t
mqttInitSystem
()
{
int32_t
mqttInitSystem
()
{
int
rc
=
0
;
int
rc
=
0
;
uint8_t
sendbuf
[
2048
];
uint8_t
sendbuf
[
2048
];
...
@@ -65,9 +66,10 @@ int32_t mqttInitSystem() {
...
@@ -65,9 +66,10 @@ int32_t mqttInitSystem() {
topicPath
=
strbetween
(
strstr
(
url
,
strstr
(
_begin_hostname
,
":"
)
!=
NULL
?
recnt_status
.
port
:
recnt_status
.
hostname
),
topicPath
=
strbetween
(
strstr
(
url
,
strstr
(
_begin_hostname
,
":"
)
!=
NULL
?
recnt_status
.
port
:
recnt_status
.
hostname
),
"/"
,
"/"
);
"/"
,
"/"
);
int
_tpsize
=
strlen
(
topicPath
)
+
strlen
(
TOPIC
)
+
1
;
char
*
_topic
=
"+/+/+/"
;
int
_tpsize
=
strlen
(
topicPath
)
+
strlen
(
_topic
)
+
1
;
recnt_status
.
topic
=
calloc
(
1
,
_tpsize
);
recnt_status
.
topic
=
calloc
(
1
,
_tpsize
);
s
nprintf
(
recnt_status
.
topic
,
_tpsize
-
1
,
"/%s/"
TOPIC
,
topicPath
);
s
printf
(
recnt_status
.
topic
,
"/%s/%s"
,
topicPath
,
_topic
);
recnt_status
.
client_id
=
strlen
(
tsMqttBrokerClientId
)
<
3
?
tsMqttBrokerClientId
:
"taos_mqtt"
;
recnt_status
.
client_id
=
strlen
(
tsMqttBrokerClientId
)
<
3
?
tsMqttBrokerClientId
:
"taos_mqtt"
;
...
@@ -86,17 +88,19 @@ int32_t mqttStartSystem() {
...
@@ -86,17 +88,19 @@ int32_t mqttStartSystem() {
mqttCleanup
(
EXIT_FAILURE
,
-
1
,
NULL
);
mqttCleanup
(
EXIT_FAILURE
,
-
1
,
NULL
);
rc
=
-
1
;
rc
=
-
1
;
}
}
mqttPrint
(
"listening for '%s' messages."
,
TOPIC
);
mqttPrint
(
"listening for '%s' messages."
,
recnt_status
.
topic
);
return
rc
;
return
rc
;
}
}
void
mqttStopSystem
()
{
void
mqttStopSystem
()
{
mqttError
(
"Injecting error:
\"
MQTT_ERROR_SOCKET_ERROR
\"
"
);
mqttError
(
"Injecting error:
\"
MQTT_ERROR_SOCKET_ERROR
\"
"
);
client
.
error
=
MQTT_ERROR_SOCKET_ERROR
;
client
.
error
=
MQTT_ERROR_SOCKET_ERROR
;
isStop
=
0
;
usleep
(
300000U
);
}
}
void
mqttCleanUpSystem
()
{
void
mqttCleanUpSystem
()
{
mqttPrint
(
"
mqttCleanUpSystem
"
);
mqttPrint
(
"
starting to clean up mqtt
"
);
mqttCleanup
(
EXIT_SUCCESS
,
client
.
socketfd
,
&
client_daemon
);
mqttCleanup
(
EXIT_SUCCESS
,
client
.
socketfd
,
&
client_daemon
);
taos_cleanup
(
mqtt_conn
);
taos_cleanup
(
mqtt_conn
);
free
(
recnt_status
.
user_name
);
free
(
recnt_status
.
user_name
);
...
@@ -105,6 +109,8 @@ void mqttCleanUpSystem() {
...
@@ -105,6 +109,8 @@ void mqttCleanUpSystem() {
free
(
recnt_status
.
port
);
free
(
recnt_status
.
port
);
free
(
recnt_status
.
topic
);
free
(
recnt_status
.
topic
);
free
(
topicPath
);
free
(
topicPath
);
mqttPrint
(
"mqtt is cleaned up"
);
}
}
void
mqtt_PublishCallback
(
void
**
unused
,
struct
mqtt_response_publish
*
published
)
{
void
mqtt_PublishCallback
(
void
**
unused
,
struct
mqtt_response_publish
*
published
)
{
...
@@ -119,15 +125,15 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
...
@@ -119,15 +125,15 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
char
_tablename
[
128
]
=
{
0
};
char
_tablename
[
128
]
=
{
0
};
if
(
mqtt_conn
==
NULL
)
{
if
(
mqtt_conn
==
NULL
)
{
mqttPrint
(
"connect database"
);
mqttPrint
(
"connect database"
);
taos_connect_a
(
NULL
,
"
monitor
"
,
tsInternalPass
,
""
,
0
,
mqttInitConnCb
,
&
client
,
&
mqtt_conn
);
taos_connect_a
(
NULL
,
"
_root
"
,
tsInternalPass
,
""
,
0
,
mqttInitConnCb
,
&
client
,
&
mqtt_conn
);
}
}
if
(
strncmp
(
topic_name
,
"/taos/"
,
6
)
==
0
)
{
if
(
topic_name
[
1
]
==
'/'
&&
strncmp
((
char
*
)
&
topic_name
[
1
],
topicPath
,
strlen
(
topicPath
)
)
==
0
)
{
char
*
p_p_cmd_part
[
5
]
=
{
0
};
char
*
p_p_cmd_part
[
5
]
=
{
0
};
char
copystr
[
1024
]
=
{
0
};
char
copystr
[
1024
]
=
{
0
};
strncpy
(
copystr
,
topic_name
,
MIN
(
1024
,
published
->
topic_name_size
));
strncpy
(
copystr
,
topic_name
,
MIN
(
1024
,
published
->
topic_name_size
));
char
part_index
=
split
(
copystr
,
"/"
,
p_p_cmd_part
,
10
);
char
part_index
=
split
(
copystr
,
"/"
,
p_p_cmd_part
,
10
);
if
(
part_index
<
4
)
{
if
(
part_index
<
4
)
{
mqttError
(
"The topic %s is't format '
%s'."
,
topic_name
,
TOPIC
);
mqttError
(
"The topic %s is't format '
/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'"
,
topic_name
);
}
else
{
}
else
{
strncpy
(
_token
,
p_p_cmd_part
[
1
],
127
);
strncpy
(
_token
,
p_p_cmd_part
[
1
],
127
);
strncpy
(
_dbname
,
p_p_cmd_part
[
2
],
127
);
strncpy
(
_dbname
,
p_p_cmd_part
[
2
],
127
);
...
@@ -148,10 +154,11 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
...
@@ -148,10 +154,11 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
}
}
void
*
mqttClientRefresher
(
void
*
client
)
{
void
*
mqttClientRefresher
(
void
*
client
)
{
while
(
1
)
{
while
(
isStop
)
{
mqtt_sync
((
struct
mqtt_client
*
)
client
);
mqtt_sync
((
struct
mqtt_client
*
)
client
);
usleep
(
100000U
);
usleep
(
100000U
);
}
}
mqttPrint
(
"Exit mqttClientRefresher"
);
return
NULL
;
return
NULL
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录