Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
cef59866
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cef59866
编写于
5月 24, 2020
作者:
麦壳饼
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
use MQTT-C
上级
331c4544
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
179 addition
and
172 deletion
+179
-172
.gitignore
.gitignore
+0
-1
.gitmodules
.gitmodules
+4
-0
deps/CMakeLists.txt
deps/CMakeLists.txt
+1
-1
deps/MQTT-C
deps/MQTT-C
+1
-0
src/dnode/src/dnodeModule.c
src/dnode/src/dnodeModule.c
+1
-1
src/inc/tmqtt.h
src/inc/tmqtt.h
+0
-0
src/plugins/mqtt/CMakeLists.txt
src/plugins/mqtt/CMakeLists.txt
+4
-3
src/plugins/mqtt/inc/mqttInit.h
src/plugins/mqtt/inc/mqttInit.h
+53
-16
src/plugins/mqtt/src/mqttPayload.c
src/plugins/mqtt/src/mqttPayload.c
+3
-2
src/plugins/mqtt/src/mqttSystem.c
src/plugins/mqtt/src/mqttSystem.c
+112
-148
未找到文件。
.gitignore
浏览文件 @
cef59866
...
...
@@ -46,7 +46,6 @@ html/
/CMakeCache.txt
/Makefile
/*.cmake
/deps
/src/cq/test/CMakeFiles/cqtest.dir/*.cmake
*.cmake
/src/cq/test/CMakeFiles/cqtest.dir/*.make
...
...
.gitmodules
浏览文件 @
cef59866
[submodule "src/connector/go"]
path = src/connector/go
url = https://github.com/taosdata/driver-go
[submodule "deps/MQTT-C"]
path = deps/MQTT-C
url = https://github.com/LiamBindle/MQTT-C.git
branch = master
deps/CMakeLists.txt
浏览文件 @
cef59866
...
...
@@ -7,4 +7,4 @@ ADD_SUBDIRECTORY(regex)
ADD_SUBDIRECTORY
(
iconv
)
ADD_SUBDIRECTORY
(
lz4
)
ADD_SUBDIRECTORY
(
cJson
)
ADD_SUBDIRECTORY
(
paho.mqtt.c
)
ADD_SUBDIRECTORY
(
MQTT-C
)
MQTT-C
@
79c1b887
Subproject commit 79c1b887856332fc12278e52c29532e9ebad2a8a
src/dnode/src/dnodeModule.c
浏览文件 @
cef59866
...
...
@@ -20,7 +20,7 @@
#include "trpc.h"
#include "mnode.h"
#include "http.h"
#include "mqtt.h"
#include "
t
mqtt.h"
#include "monitor.h"
#include "dnodeInt.h"
#include "dnodeModule.h"
...
...
src/inc/mqtt.h
→
src/inc/
t
mqtt.h
浏览文件 @
cef59866
文件已移动
src/plugins/mqtt/CMakeLists.txt
浏览文件 @
cef59866
...
...
@@ -8,14 +8,15 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/query/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/common/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/cJson/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/paho.mqtt.c/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/MQTT-C/include
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/MQTT-C/examples/templates
)
INCLUDE_DIRECTORIES
(
${
TD_OS_DIR
}
/inc
)
INCLUDE_DIRECTORIES
(
inc
)
AUX_SOURCE_DIRECTORY
(
src SRC
)
ADD_LIBRARY
(
mqtt
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
mqtt
pahomqtt taos_static cJson
)
TARGET_LINK_LIBRARIES
(
mqtt
taos_static cJson mqttc
)
IF
(
TD_ADMIN
)
TARGET_LINK_LIBRARIES
(
mqtt
pahomqtt
admin cJson
)
TARGET_LINK_LIBRARIES
(
mqtt admin cJson
)
ENDIF
()
ENDIF
()
src/plugins/mqtt/inc/mqttInit.h
浏览文件 @
cef59866
...
...
@@ -19,25 +19,62 @@
extern
"C"
{
#endif
#include <stdint.h>
#include "MQTTAsync.h"
#include "os.h"
/**
* @file
* A simple subscriber program that performs automatic reconnections.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mqtt.h"
#include "taos.h"
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
#include "tsclient.h"
char
split
(
char
str
[],
char
delims
[],
char
**
p_p_cmd_part
,
int
max
);
void
mqttConnnectLost
(
void
*
context
,
char
*
cause
);
int
mqttMessageArrived
(
void
*
context
,
char
*
topicName
,
int
topicLen
,
MQTTAsync_message
*
message
);
void
mqttQueryInsertCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
onDisconnectFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
);
void
onDisconnect
(
void
*
context
,
MQTTAsync_successData
*
response
);
void
onSubscribe
(
void
*
context
,
MQTTAsync_successData
*
response
);
void
onSubscribeFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
);
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
/**
* @brief A structure that I will use to keep track of some data needed
* to setup the connection to the broker.
*
* An instance of this struct will be created in my \c main(). Then, whenever
* \ref mqttReconnectClient is called, this instance will be passed.
*/
struct
reconnect_state_t
{
const
char
*
hostname
;
const
char
*
port
;
const
char
*
topic
;
uint8_t
*
sendbuf
;
size_t
sendbufsz
;
uint8_t
*
recvbuf
;
size_t
recvbufsz
;
};
/**
* @brief My reconnect callback. It will reestablish the connection whenever
* an error occurs.
*/
void
mqttReconnectClient
(
struct
mqtt_client
*
client
,
void
**
reconnect_state_vptr
);
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void
mqtt_PublishCallback
(
void
**
unused
,
struct
mqtt_response_publish
*
published
);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
void
*
mqttClientRefresher
(
void
*
client
);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void
mqttCleanup
(
int
status
,
int
sockfd
,
pthread_t
*
client_daemon
);
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
mqttQueryInsertCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
#define CLIENTID "taos"
#define TOPIC "/taos/+/+/+/" // taos/<token>/<db name>/<table name>/
#define PAYLOAD "Hello World!"
...
...
src/plugins/mqtt/src/mqttPayload.c
浏览文件 @
cef59866
...
...
@@ -39,7 +39,7 @@ char* converJsonToSql(char* json, char* _dbname, char* _tablename) {
char
_values
[
102400
]
=
{
0
};
int
i
=
0
;
int
count
=
cJSON_GetArraySize
(
jPlayload
);
for
(;
i
<
count
;
i
++
)
//jsonֵ
for
(;
i
<
count
;
i
++
)
{
cJSON
*
item
=
cJSON_GetArrayItem
(
jPlayload
,
i
);
if
(
cJSON_Object
==
item
->
type
)
{
...
...
@@ -58,7 +58,8 @@ char* converJsonToSql(char* json, char* _dbname, char* _tablename) {
}
}
cJSON_free
(
jPlayload
);
char
*
_sql
=
calloc
(
0
,
strlen
(
_names
)
+
strlen
(
_values
)
+
strlen
(
_dbname
)
+
strlen
(
_tablename
)
+
1024
);
int
sqllen
=
strlen
(
_names
)
+
strlen
(
_values
)
+
strlen
(
_dbname
)
+
strlen
(
_tablename
)
+
1024
;
char
*
_sql
=
calloc
(
1
,
sqllen
);
sprintf
(
_sql
,
"INSERT INTO %s.%s (%s) VALUES(%s);"
,
_dbname
,
_tablename
,
_names
,
_values
);
return
_sql
;
}
\ No newline at end of file
src/plugins/mqtt/src/mqttSystem.c
浏览文件 @
cef59866
...
...
@@ -14,50 +14,78 @@
*/
#define _DEFAULT_SOURCE
#include "mqttSystem.h"
#include "MQTTAsync.h"
#include "cJSON.h"
#include "mqtt.h"
#include "mqttInit.h"
#include "mqttLog.h"
#include "mqttPayload.h"
#include "os.h"
#include "posix_sockets.h"
#include "string.h"
#include "taos.h"
#include "tglobal.h"
#include "tmqtt.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"
#include "mqttInit.h"
#include "mqttPayload.h"
#include "mqttSystem.h"
struct
mqtt_client
client
;
pthread_t
client_daemon
;
void
*
mqtt_conn
;
struct
reconnect_state_t
reconnect_state
;
int32_t
mqttInitSystem
()
{
int
rc
=
0
;
const
char
*
addr
;
const
char
*
port
;
addr
=
tsMqttBrokerAddress
;
port
=
"1883"
;
reconnect_state
.
hostname
=
addr
;
reconnect_state
.
port
=
port
;
reconnect_state
.
topic
=
TOPIC
;
uint8_t
sendbuf
[
2048
];
uint8_t
recvbuf
[
1024
];
reconnect_state
.
sendbuf
=
sendbuf
;
reconnect_state
.
sendbufsz
=
sizeof
(
sendbuf
);
reconnect_state
.
recvbuf
=
recvbuf
;
reconnect_state
.
recvbufsz
=
sizeof
(
recvbuf
);
taos_init
();
mqttPrint
(
"mqttInitSystem %s"
,
tsMqttBrokerAddress
);
return
rc
;
}
MQTTAsync
client
;
MQTTAsync_connectOptions
conn_opts
=
MQTTAsync_connectOptions_initializer
;
MQTTAsync_disconnectOptions
disc_opts
=
MQTTAsync_disconnectOptions_initializer
;
void
*
mqtt_conn
=
NULL
;
int
disc_finished
=
0
;
int
subscribed
=
0
;
int
finished
=
0
;
int
can_exit
=
0
;
void
mqttConnnectLost
(
void
*
context
,
char
*
cause
)
{
MQTTAsync
client
=
(
MQTTAsync
)
context
;
MQTTAsync_connectOptions
conn_opts
=
MQTTAsync_connectOptions_initializer
;
int
rc
;
mqttError
(
"
\n
Connection lost"
);
if
(
cause
)
mqttError
(
" cause: %s"
,
cause
);
mqttPrint
(
"Reconnecting"
);
conn_opts
.
keepAliveInterval
=
20
;
conn_opts
.
cleansession
=
1
;
if
((
rc
=
MQTTAsync_connect
(
client
,
&
conn_opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start connect, return code %d"
,
rc
);
finished
=
1
;
int32_t
mqttStartSystem
()
{
int
rc
=
0
;
mqtt_conn
=
NULL
;
mqtt_init_reconnect
(
&
client
,
mqttReconnectClient
,
&
reconnect_state
,
mqtt_PublishCallback
);
if
(
pthread_create
(
&
client_daemon
,
NULL
,
mqttClientRefresher
,
&
client
))
{
mqttError
(
"Failed to start client daemon."
);
mqttCleanup
(
EXIT_FAILURE
,
-
1
,
NULL
);
rc
=
-
1
;
}
mqttPrint
(
"listening for '%s' messages."
,
TOPIC
);
return
rc
;
}
void
mqttStopSystem
()
{
mqttError
(
"Injecting error:
\"
MQTT_ERROR_SOCKET_ERROR
\"
"
);
client
.
error
=
MQTT_ERROR_SOCKET_ERROR
;
}
void
mqttCleanUpSystem
()
{
mqttPrint
(
"mqttCleanUpSystem"
);
mqttCleanup
(
EXIT_SUCCESS
,
client
.
socketfd
,
&
client_daemon
);
taos_cleanup
(
mqtt_conn
);
}
int
mqttMessageArrived
(
void
*
context
,
char
*
topicName
,
int
topicLen
,
MQTTAsync_message
*
message
)
{
mqttTrace
(
"Message arrived,topic is %s,message is %.*s"
,
topicName
,
message
->
payloadlen
,
(
char
*
)
message
->
payload
);
void
mqtt_PublishCallback
(
void
**
unused
,
struct
mqtt_response_publish
*
published
)
{
mqttPrint
(
"mqtt_PublishCallback"
);
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char
*
topic_name
=
(
char
*
)
malloc
(
published
->
topic_name_size
+
1
);
memcpy
(
topic_name
,
published
->
topic_name
,
published
->
topic_name_size
);
topic_name
[
published
->
topic_name_size
]
=
'\0'
;
mqttPrint
(
"Received publish('%s'): %s"
,
topic_name
,
(
const
char
*
)
published
->
application_message
);
char
_token
[
128
]
=
{
0
};
char
_dbname
[
128
]
=
{
0
};
char
_tablename
[
128
]
=
{
0
};
...
...
@@ -65,162 +93,98 @@ int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_m
mqttPrint
(
"connect database"
);
taos_connect_a
(
NULL
,
"monitor"
,
tsInternalPass
,
""
,
0
,
mqttInitConnCb
,
&
client
,
&
mqtt_conn
);
}
if
(
strncmp
(
topic
N
ame
,
"/taos/"
,
6
)
==
0
)
{
if
(
strncmp
(
topic
_n
ame
,
"/taos/"
,
6
)
==
0
)
{
char
*
p_p_cmd_part
[
5
]
=
{
0
};
char
copystr
[
1024
]
=
{
0
};
strncpy
(
copystr
,
topic
Name
,
MIN
(
1024
,
strlen
(
topicName
)
));
strncpy
(
copystr
,
topic
_name
,
MIN
(
1024
,
published
->
topic_name_size
));
char
part_index
=
split
(
copystr
,
"/"
,
p_p_cmd_part
,
10
);
if
(
part_index
<
4
)
{
mqttError
(
"The topic %s is't format '%s'."
,
topic
N
ame
,
TOPIC
);
mqttError
(
"The topic %s is't format '%s'."
,
topic
_n
ame
,
TOPIC
);
}
else
{
strncpy
(
_token
,
p_p_cmd_part
[
1
],
127
);
strncpy
(
_dbname
,
p_p_cmd_part
[
2
],
127
);
strncpy
(
_tablename
,
p_p_cmd_part
[
3
],
127
);
mqttPrint
(
"part count=%d,access token:%s,database name:%s, table name:%s"
,
part_index
,
_token
,
_dbname
,
_tablename
);
if
(
mqtt_conn
!=
NULL
)
{
char
*
_sql
=
converJsonToSql
((
char
*
)
message
->
payload
,
_dbname
,
_tablename
);
char
*
_sql
=
converJsonToSql
((
char
*
)
published
->
application_message
,
_dbname
,
_tablename
);
mqttPrint
(
"query:%s"
,
_sql
);
taos_query_a
(
mqtt_conn
,
_sql
,
mqttQueryInsertCallback
,
&
client
);
mqttPrint
(
"free sql:%s"
,
_sql
);
free
(
_sql
);
}
}
}
MQTTAsync_freeMessage
(
&
message
);
MQTTAsync_free
(
topicName
);
return
1
;
free
(
topic_name
);
}
void
mqttQueryInsertCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
)
{
if
(
code
<
0
)
{
mqttError
(
"mqtt:%d, save data failed, code:%s"
,
code
,
tstrerror
(
code
));
}
else
if
(
code
==
0
)
{
mqttError
(
"mqtt:%d, save data failed, affect rows:%d"
,
code
,
code
);
}
else
{
mqttPrint
(
"mqtt:%d, save data success, code:%s"
,
code
,
tstrerror
(
code
));
void
*
mqttClientRefresher
(
void
*
client
)
{
while
(
1
)
{
mqtt_sync
((
struct
mqtt_client
*
)
client
);
usleep
(
100000U
);
}
return
NULL
;
}
void
onDisconnectFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
mqttError
(
"Disconnect failed, rc %d"
,
response
->
code
);
disc_finished
=
1
;
void
mqttCleanup
(
int
status
,
int
sockfd
,
pthread_t
*
client_daemon
)
{
mqttPrint
(
"mqttCleanup"
);
if
(
sockfd
!=
-
1
)
close
(
sockfd
);
if
(
client_daemon
!=
NULL
)
pthread_cancel
(
*
client_daemon
);
}
void
onDisconnect
(
void
*
context
,
MQTTAsync_successData
*
respons
e
)
{
mqttError
(
"Successful disconnection"
);
if
(
mqtt_conn
!=
NULL
)
{
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
cod
e
)
{
if
(
code
<
0
)
{
mqttError
(
"mqtt:%d, connect to database failed, reason:%s"
,
code
,
tstrerror
(
code
));
taos_close
(
mqtt_conn
);
mqtt_conn
=
NULL
;
return
;
}
disc_finished
=
1
;
}
void
onSubscribe
(
void
*
context
,
MQTTAsync_successData
*
response
)
{
mqttPrint
(
"Subscribe succeeded"
);
subscribed
=
1
;
}
void
onSubscribeFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
mqttError
(
"Subscribe failed, rc %d"
,
response
->
code
);
finished
=
1
;
mqttTrace
(
"mqtt:%d, connect to database success, reason:%s"
,
code
,
tstrerror
(
code
));
}
void
onConnectFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
mqttError
(
"Connect failed, rc %d,,Retry later"
,
response
->
code
);
finished
=
1
;
taosMsleep
(
1000
);
int
rc
=
0
;
if
((
rc
=
MQTTAsync_connect
(
client
,
&
conn_opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start connect, return code %d"
,
rc
);
finished
=
1
;
void
mqttQueryInsertCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
)
{
if
(
code
<
0
)
{
mqttError
(
"mqtt:%d, save data failed, code:%s"
,
code
,
tstrerror
(
code
));
}
else
if
(
code
==
0
)
{
mqttError
(
"mqtt:%d, save data failed, affect rows:%d"
,
code
,
code
);
}
else
{
mqttPrint
(
"mqtt:%d, save data success, code:%s"
,
code
,
tstrerror
(
code
));
}
}
void
onConnect
(
void
*
context
,
MQTTAsync_successData
*
response
)
{
MQTTAsync
client
=
(
MQTTAsync
)
context
;
MQTTAsync_responseOptions
opts
=
MQTTAsync_responseOptions_initializer
;
int
rc
;
void
mqttReconnectClient
(
struct
mqtt_client
*
client
,
void
**
reconnect_state_vptr
)
{
mqttPrint
(
"mqttReconnectClient"
);
struct
reconnect_state_t
*
reconnect_state
=
*
((
struct
reconnect_state_t
**
)
reconnect_state_vptr
);
mqttPrint
(
"Successful connection
\n
"
);
mqttPrint
(
"Subscribing to topic %s
\n
for client %s using QoS%d"
,
TOPIC
,
CLIENTID
,
QOS
);
opts
.
onSuccess
=
onSubscribe
;
opts
.
onFailure
=
onSubscribeFailure
;
opts
.
context
=
client
;
if
((
rc
=
MQTTAsync_subscribe
(
client
,
TOPIC
,
QOS
,
&
opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start subscribe, return code %d
\n
"
,
rc
);
finished
=
1
;
/* Close the clients socket if this isn't the initial reconnect call */
if
(
client
->
error
!=
MQTT_ERROR_INITIAL_RECONNECT
)
{
close
(
client
->
socketfd
);
}
}
int32_t
mqttInitSystem
()
{
int
rc
=
0
;
if
(
strnlen
(
tsMqttBrokerAddress
,
128
)
==
0
)
{
rc
=
EXIT_FAILURE
;
mqttError
(
"Can't to create client, mqtt broker address is empty %d"
,
rc
);
}
else
{
if
((
rc
=
MQTTAsync_create
(
&
client
,
tsMqttBrokerAddress
,
CLIENTID
,
MQTTCLIENT_PERSISTENCE_NONE
,
NULL
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to create client, return code %d"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
if
((
rc
=
MQTTAsync_setCallbacks
(
client
,
client
,
mqttConnnectLost
,
mqttMessageArrived
,
NULL
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to set callbacks, return code %d"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
conn_opts
.
keepAliveInterval
=
20
;
conn_opts
.
cleansession
=
1
;
conn_opts
.
onSuccess
=
onConnect
;
conn_opts
.
onFailure
=
onConnectFailure
;
conn_opts
.
context
=
client
;
taos_init
();
}
}
/* Perform error handling here. */
if
(
client
->
error
!=
MQTT_ERROR_INITIAL_RECONNECT
)
{
mqttError
(
"mqttReconnectClient: called while client was in error state
\"
%s
\"
"
,
mqtt_error_str
(
client
->
error
));
}
return
rc
;
}
int32_t
mqttStartSystem
()
{
int
rc
=
0
;
mqttPrint
(
"mqttStartSystem"
);
if
((
rc
=
MQTTAsync_connect
(
client
,
&
conn_opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start connect, return code %d"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
while
(
!
subscribed
&&
!
finished
)
usleep
(
10000L
);
disc_opts
.
onSuccess
=
onDisconnect
;
disc_opts
.
onFailure
=
onDisconnectFailure
;
mqttPrint
(
"Successful started
\n
"
);
/* Open a new socket. */
int
sockfd
=
open_nb_socket
(
reconnect_state
->
hostname
,
reconnect_state
->
port
);
if
(
sockfd
==
-
1
)
{
mqttError
(
"Failed to open socket: "
);
mqttCleanup
(
EXIT_FAILURE
,
sockfd
,
NULL
);
}
return
rc
;
}
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
)
{
if
(
code
<
0
)
{
mqttError
(
"mqtt:%d, connect to database failed, reason:%s"
,
code
,
tstrerror
(
code
));
taos_close
(
mqtt_conn
);
mqtt_conn
=
NULL
;
return
;
}
mqttTrace
(
"mqtt:%d, connect to database success, reason:%s"
,
code
,
tstrerror
(
code
));
}
/* Reinitialize the client. */
mqtt_reinit
(
client
,
sockfd
,
reconnect_state
->
sendbuf
,
reconnect_state
->
sendbufsz
,
reconnect_state
->
recvbuf
,
reconnect_state
->
recvbufsz
);
void
mqttStopSystem
()
{
int
rc
=
0
;
if
((
rc
=
MQTTAsync_disconnect
(
client
,
&
disc_opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start disconnect, return code %d"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
while
(
!
disc_finished
)
{
usleep
(
10000L
);
}
}
taos_close
(
mqtt_conn
);
}
/* Create an anonymous session */
const
char
*
client_id
=
NULL
;
/* Ensure we have a clean session */
uint8_t
connect_flags
=
MQTT_CONNECT_CLEAN_SESSION
;
/* Send connection request to the broker. */
mqtt_connect
(
client
,
client_id
,
NULL
,
NULL
,
0
,
NULL
,
NULL
,
connect_flags
,
400
);
void
mqttCleanUpSystem
()
{
mqttPrint
(
"mqttCleanUpSystem"
);
MQTTAsync_destroy
(
&
client
);
taos_cleanup
(
mqtt_conn
);
}
/* Subscribe to the topic. */
mqtt_subscribe
(
client
,
reconnect_state
->
topic
,
0
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录