Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
9e50edfe
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9e50edfe
编写于
5月 23, 2020
作者:
麦壳饼
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Receive subscription messages and query playload sql
上级
f8ffd8cb
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
188 addition
and
14 deletion
+188
-14
src/plugins/mqtt/CMakeLists.txt
src/plugins/mqtt/CMakeLists.txt
+3
-4
src/plugins/mqtt/inc/mqttSystem.h
src/plugins/mqtt/inc/mqttSystem.h
+15
-2
src/plugins/mqtt/src/mqttSystem.c
src/plugins/mqtt/src/mqttSystem.c
+170
-8
未找到文件。
src/plugins/mqtt/CMakeLists.txt
浏览文件 @
9e50edfe
...
...
@@ -7,16 +7,15 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/util/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/query/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/common/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/zlib-1.2.11/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/cJson/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/
lz4
/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/
paho.mqtt.c
/inc
)
INCLUDE_DIRECTORIES
(
${
TD_OS_DIR
}
/inc
)
INCLUDE_DIRECTORIES
(
inc
)
AUX_SOURCE_DIRECTORY
(
src SRC
)
ADD_LIBRARY
(
mqtt
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
mqtt taos_static z
)
TARGET_LINK_LIBRARIES
(
mqtt pahomqtt taos_static
)
IF
(
TD_ADMIN
)
TARGET_LINK_LIBRARIES
(
mqtt admin
)
TARGET_LINK_LIBRARIES
(
mqtt
pahomqtt
admin
)
ENDIF
()
ENDIF
()
src/plugins/mqtt/inc/mqttSystem.h
浏览文件 @
9e50edfe
...
...
@@ -20,13 +20,26 @@ extern "C" {
#endif
#include <stdint.h>
#include "MQTTAsync.h"
#include "os.h"
#include "taos.h"
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
#include "tsclient.h"
int32_t
mqttGetReqCount
();
int32_t
mqttInitSystem
();
int32_t
mqttStartSystem
();
void
mqttStopSystem
();
void
mqttCleanUpSystem
();
void
connlost
(
void
*
context
,
char
*
cause
);
int
msgarrvd
(
void
*
context
,
char
*
topicName
,
int
topicLen
,
MQTTAsync_message
*
message
);
void
mqtt_query_insert_callback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
onDisconnectFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
);
void
onDisconnect
(
void
*
context
,
MQTTAsync_successData
*
response
);
void
onSubscribe
(
void
*
context
,
MQTTAsync_successData
*
response
);
void
onSubscribeFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
);
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
#ifdef __cplusplus
}
#endif
...
...
src/plugins/mqtt/src/mqttSystem.c
浏览文件 @
9e50edfe
...
...
@@ -22,22 +22,184 @@
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
#include "MQTTAsync.h"
#include "tsclient.h"
#define ADDRESS "tcp://mqtt.eclipse.org:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
MQTTAsync
client
;
MQTTAsync_connectOptions
conn_opts
=
MQTTAsync_connectOptions_initializer
;
MQTTAsync_disconnectOptions
disc_opts
=
MQTTAsync_disconnectOptions_initializer
;
void
*
mqtt_conn
=
NULL
;
int
disc_finished
=
0
;
int
subscribed
=
0
;
int
finished
=
0
;
int
can_exit
=
0
;
void
connlost
(
void
*
context
,
char
*
cause
)
{
MQTTAsync
client
=
(
MQTTAsync
)
context
;
MQTTAsync_connectOptions
conn_opts
=
MQTTAsync_connectOptions_initializer
;
int
rc
;
mqttError
(
"
\n
Connection lost
\n
"
);
if
(
cause
)
mqttError
(
" cause: %s
\n
"
,
cause
);
mqttPrint
(
"Reconnecting
\n
"
);
conn_opts
.
keepAliveInterval
=
20
;
conn_opts
.
cleansession
=
1
;
if
((
rc
=
MQTTAsync_connect
(
client
,
&
conn_opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start connect, return code %d
\n
"
,
rc
);
finished
=
1
;
}
}
int
msgarrvd
(
void
*
context
,
char
*
topicName
,
int
topicLen
,
MQTTAsync_message
*
message
)
{
mqttPrint
(
"Message arrived
\n
"
);
mqttPrint
(
" topic: %s
\n
"
,
topicName
);
mqttPrint
(
" message: %.*s
\n
"
,
message
->
payloadlen
,
(
char
*
)
message
->
payload
);
if
(
mqtt_conn
==
NULL
)
{
taos_connect_a
(
NULL
,
"monitor"
,
tsInternalPass
,
""
,
0
,
mqttInitConnCb
,
&
client
,
&
mqtt_conn
);
}
if
(
mqtt_conn
!=
NULL
)
{
taos_query_a
(
mqtt_conn
,
(
char
*
)
message
->
payload
,
mqtt_query_insert_callback
,
&
client
);
}
MQTTAsync_freeMessage
(
&
message
);
MQTTAsync_free
(
topicName
);
return
1
;
}
void
mqtt_query_insert_callback
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
)
{
if
(
code
<
0
)
{
mqttError
(
"mqtt:%p, save data failed, code:%s"
,
mqtt_conn
,
tstrerror
(
code
));
}
else
if
(
code
==
0
)
{
mqttError
(
"mqtt:%p, save data failed, affect rows:%d"
,
mqtt_conn
,
code
);
}
else
{
mqttError
(
"mqtt:%p, save data success, code:%s"
,
mqtt_conn
,
tstrerror
(
code
));
}
}
void
onDisconnectFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
mqttError
(
"Disconnect failed, rc %d
\n
"
,
response
->
code
);
disc_finished
=
1
;
}
void
onDisconnect
(
void
*
context
,
MQTTAsync_successData
*
response
)
{
mqttError
(
"Successful disconnection
\n
"
);
if
(
mqtt_conn
!=
NULL
)
{
taos_close
(
&
(
mqtt_conn
));
mqtt_conn
=
NULL
;
}
disc_finished
=
1
;
}
void
onSubscribe
(
void
*
context
,
MQTTAsync_successData
*
response
)
{
mqttPrint
(
"Subscribe succeeded
\n
"
);
subscribed
=
1
;
}
void
onSubscribeFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
mqttError
(
"Subscribe failed, rc %d
\n
"
,
response
->
code
);
finished
=
1
;
if
(
mqtt_conn
!=
NULL
)
{
taos_close
(
mqtt_conn
);
mqtt_conn
=
NULL
;
}
}
void
onConnectFailure
(
void
*
context
,
MQTTAsync_failureData
*
response
)
{
mqttError
(
"Connect failed, rc %d
\n
"
,
response
->
code
);
finished
=
1
;
}
void
onConnect
(
void
*
context
,
MQTTAsync_successData
*
response
)
{
MQTTAsync
client
=
(
MQTTAsync
)
context
;
MQTTAsync_responseOptions
opts
=
MQTTAsync_responseOptions_initializer
;
int
rc
;
mqttPrint
(
"Successful connection
\n
"
);
mqttPrint
(
"Subscribing to topic %s
\n
for client %s using QoS%d
\n\n
"
,
TOPIC
,
CLIENTID
,
QOS
);
opts
.
onSuccess
=
onSubscribe
;
opts
.
onFailure
=
onSubscribeFailure
;
opts
.
context
=
client
;
if
((
rc
=
MQTTAsync_subscribe
(
client
,
TOPIC
,
QOS
,
&
opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start subscribe, return code %d
\n
"
,
rc
);
finished
=
1
;
}
}
int32_t
mqttGetReqCount
()
{
return
0
;
}
int
mqttInitSystem
()
{
mqttPrint
(
"mqttInitSystem"
);
return
0
;
int32_t
mqttInitSystem
()
{
int
rc
=
0
;
if
((
rc
=
MQTTAsync_create
(
&
client
,
ADDRESS
,
CLIENTID
,
MQTTCLIENT_PERSISTENCE_NONE
,
NULL
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to create client, return code %d
\n
"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
if
((
rc
=
MQTTAsync_setCallbacks
(
client
,
client
,
connlost
,
msgarrvd
,
NULL
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to set callbacks, return code %d
\n
"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
conn_opts
.
keepAliveInterval
=
20
;
conn_opts
.
cleansession
=
1
;
conn_opts
.
onSuccess
=
onConnect
;
conn_opts
.
onFailure
=
onConnectFailure
;
conn_opts
.
context
=
client
;
taos_init
();
}
}
return
rc
;
}
int
mqttStartSystem
()
{
int32_t
mqttStartSystem
()
{
int
rc
=
0
;
mqttPrint
(
"mqttStartSystem"
);
return
0
;
if
((
rc
=
MQTTAsync_connect
(
client
,
&
conn_opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start connect, return code %d
\n
"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
while
(
!
subscribed
&&
!
finished
)
usleep
(
10000L
);
disc_opts
.
onSuccess
=
onDisconnect
;
disc_opts
.
onFailure
=
onDisconnectFailure
;
mqttPrint
(
"Successful started
\n
"
);
}
return
rc
;
}
void
mqttInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
)
{
if
(
code
<
0
)
{
mqttError
(
"mqtt:%d, connect to database failed, reason:%s"
,
code
,
tstrerror
(
code
));
taos_close
(
mqtt_conn
);
mqtt_conn
=
NULL
;
return
;
}
mqttTrace
(
"mqtt:%d, connect to database success, reason:%s"
,
code
,
tstrerror
(
code
));
}
void
mqttStopSystem
()
{
mqttPrint
(
"mqttStopSystem"
);
int
rc
=
0
;
if
((
rc
=
MQTTAsync_disconnect
(
client
,
&
disc_opts
))
!=
MQTTASYNC_SUCCESS
)
{
mqttError
(
"Failed to start disconnect, return code %d
\n
"
,
rc
);
rc
=
EXIT_FAILURE
;
}
else
{
while
(
!
disc_finished
)
{
usleep
(
10000L
);
}
}
taos_close
(
mqtt_conn
);
}
void
mqttCleanUpSystem
()
{
mqttPrint
(
"mqttCleanUpSystem"
);
void
mqttCleanUpSystem
()
{
mqttPrint
(
"mqttCleanUpSystem"
);
MQTTAsync_destroy
(
&
client
);
taos_cleanup
(
mqtt_conn
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录