提交 400c1a86 编写于 作者: 麦壳饼's avatar 麦壳饼

Implemented subscription json data from Mqtt broker! we subscription topic ...

Implemented subscription json data from Mqtt broker! we subscription topic  "/taos/+/+/+/" , format  just like this '/taos/<token>/<db name>/<table name>/', next will add auth by token.
上级 9e50edfe
...@@ -94,6 +94,7 @@ extern int32_t tsMaxTables; ...@@ -94,6 +94,7 @@ extern int32_t tsMaxTables;
extern char tsDefaultDB[]; extern char tsDefaultDB[];
extern char tsDefaultUser[]; extern char tsDefaultUser[];
extern char tsDefaultPass[]; extern char tsDefaultPass[];
extern char tsMqttBrokerAddress[];
extern int32_t tsMaxMeterConnections; extern int32_t tsMaxMeterConnections;
extern int32_t tsMaxVnodeConnections; extern int32_t tsMaxVnodeConnections;
extern int32_t tsMaxMgmtConnections; extern int32_t tsMaxMgmtConnections;
......
...@@ -201,6 +201,8 @@ int32_t tsMonitorInterval = 30; // seconds ...@@ -201,6 +201,8 @@ int32_t tsMonitorInterval = 30; // seconds
char tsTimezone[64] = {0}; char tsTimezone[64] = {0};
char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
char tsMqttBrokerAddress[128] = {0};
static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT; static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT;
...@@ -729,6 +731,16 @@ static void doInitGlobalConfig() { ...@@ -729,6 +731,16 @@ static void doInitGlobalConfig() {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "mqttBrokerAddress";
cfg.ptr = tsMqttBrokerAddress;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 126;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// socket type; udp by default // socket type; udp by default
cfg.option = "sockettype"; cfg.option = "sockettype";
cfg.ptr = tsSocketType; cfg.ptr = tsSocketType;
......
...@@ -19,10 +19,7 @@ ...@@ -19,10 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include <stdint.h> #include <stdint.h>
int32_t mqttGetReqCount();
int32_t mqttInitSystem(); int32_t mqttInitSystem();
int32_t mqttStartSystem(); int32_t mqttStartSystem();
void mqttStopSystem(); void mqttStopSystem();
......
...@@ -13,9 +13,9 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -13,9 +13,9 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(mqtt ${SRC}) ADD_LIBRARY(mqtt ${SRC})
TARGET_LINK_LIBRARIES( mqtt pahomqtt taos_static ) TARGET_LINK_LIBRARIES(mqtt pahomqtt taos_static cJson)
IF (TD_ADMIN) IF (TD_ADMIN)
TARGET_LINK_LIBRARIES(mqtt pahomqtt admin) TARGET_LINK_LIBRARIES(mqtt pahomqtt admin cJson)
ENDIF () ENDIF ()
ENDIF () ENDIF ()
...@@ -27,11 +27,11 @@ extern "C" { ...@@ -27,11 +27,11 @@ extern "C" {
#include "tsocket.h" #include "tsocket.h"
#include "ttimer.h" #include "ttimer.h"
#include "tsclient.h" #include "tsclient.h"
int32_t mqttGetReqCount();
int32_t mqttInitSystem(); int32_t mqttInitSystem();
int32_t mqttStartSystem(); int32_t mqttStartSystem();
void mqttStopSystem(); void mqttStopSystem();
void mqttCleanUpSystem(); void mqttCleanUpSystem();
char split(char str[], char delims[], char** p_p_cmd_part, int max);
void connlost(void* context, char* cause); void connlost(void* context, char* cause);
int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message); int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code); void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code);
......
...@@ -15,18 +15,20 @@ ...@@ -15,18 +15,20 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mqttSystem.h" #include "mqttSystem.h"
#include "MQTTAsync.h"
#include "cJson.h"
#include "mqtt.h" #include "mqtt.h"
#include "mqttLog.h" #include "mqttLog.h"
#include "os.h" #include "os.h"
#include "string.h"
#include "taos.h" #include "taos.h"
#include "tglobal.h" #include "tglobal.h"
#include "tsclient.h"
#include "tsocket.h" #include "tsocket.h"
#include "ttimer.h" #include "ttimer.h"
#include "MQTTAsync.h"
#include "tsclient.h" #define CLIENTID "taos"
#define ADDRESS "tcp://mqtt.eclipse.org:1883" #define TOPIC "/taos/+/+/+/" // taos/<token>/<db name>/<table name>/
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!" #define PAYLOAD "Hello World!"
#define QOS 1 #define QOS 1
#define TIMEOUT 10000L #define TIMEOUT 10000L
...@@ -34,50 +36,107 @@ ...@@ -34,50 +36,107 @@
MQTTAsync client; MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
void* mqtt_conn=NULL; void* mqtt_conn = NULL;
int disc_finished = 0; int disc_finished = 0;
int subscribed = 0; int subscribed = 0;
int finished = 0; int finished = 0;
int can_exit = 0; int can_exit = 0;
void connlost(void* context, char* cause) { void connlost(void* context, char* cause) {
MQTTAsync client = (MQTTAsync)context; MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc; int rc;
mqttError("\nConnection lost\n"); mqttError("\nConnection lost");
if (cause) mqttError(" cause: %s\n", cause); if (cause) mqttError(" cause: %s", cause);
mqttPrint("Reconnecting\n"); mqttPrint("Reconnecting");
conn_opts.keepAliveInterval = 20; conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
mqttError("Failed to start connect, return code %d\n", rc); mqttError("Failed to start connect, return code %d", rc);
finished = 1; finished = 1;
} }
} }
char split(char str[], char delims[], char** p_p_cmd_part, int max) {
char* token = strtok(str, delims);
char part_index = 0;
char** tmp_part = p_p_cmd_part;
while (token) {
*tmp_part++ = token;
token = strtok(NULL, delims);
part_index++;
if (part_index >= max) break;
}
return part_index;
}
int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message) { int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_message* message) {
mqttPrint("Message arrived\n"); mqttTrace("Message arrived,topic is %s,message is %.*s", topicName, message->payloadlen, (char*)message->payload);
mqttPrint(" topic: %s\n", topicName); char _token[128] = {0};
mqttPrint(" message: %.*s\n", message->payloadlen, (char*)message->payload); char _dbname[128] = {0};
char _tablename[128] = {0};
if (mqtt_conn == NULL) { if (mqtt_conn == NULL) {
mqttPrint("connect database");
taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn); taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn);
} }
if (mqtt_conn != NULL) { if (strncmp(topicName, "/taos/", 6) == 0) {
taos_query_a(mqtt_conn, (char*)message->payload, mqtt_query_insert_callback, &client); char* p_p_cmd_part[5] = {0};
char copystr[1024] = {0};
strncpy(copystr, topicName, MIN(1024, strlen(topicName)));
char part_index = split(copystr, "/", p_p_cmd_part, 10);
if (part_index < 4) {
mqttError("The topic %s is't format '%s'.", topicName, TOPIC);
} else {
strncpy(_token, p_p_cmd_part[1], 127);
strncpy(_dbname, p_p_cmd_part[2], 127);
strncpy(_tablename, p_p_cmd_part[3], 127);
mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
_tablename);
cJSON* jPlayload = cJSON_Parse((char*)message->payload);
char _names[102400] = {0};
char _values[102400] = {0};
int i = 0;
int count = cJSON_GetArraySize(jPlayload);
for (; i < count; i++) //jsonֵ
{
cJSON* item = cJSON_GetArrayItem(jPlayload, i);
if (cJSON_Object == item->type) {
mqttPrint("The item '%s' is not supported", item->string);
} else {
strcat(_names, item->string);
if (i < count - 1) {
strcat(_names, ",");
}
char* __value_json = cJSON_Print(item);
strcat(_values, __value_json);
free(__value_json);
if (i < count - 1) {
strcat(_values, ",");
}
}
}
cJSON_free(jPlayload);
char _sql[102400] = {0};
sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values);
if (mqtt_conn != NULL) {
mqttPrint("query:%s", _sql);
taos_query_a(mqtt_conn, _sql, mqtt_query_insert_callback, &client);
}
}
} }
MQTTAsync_freeMessage(&message); MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName); MQTTAsync_free(topicName);
return 1; return 1;
} }
void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code) { void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) { if (code < 0) {
mqttError("mqtt:%p, save data failed, code:%s", mqtt_conn, tstrerror(code)); mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code));
} else if (code == 0) { } else if (code == 0) {
mqttError("mqtt:%p, save data failed, affect rows:%d", mqtt_conn, code); mqttError("mqtt:%d, save data failed, affect rows:%d", code, code);
} else { } else {
mqttError("mqtt:%p, save data success, code:%s", mqtt_conn, tstrerror(code)); mqttPrint("mqtt:%d, save data success, code:%s", code, tstrerror(code));
} }
} }
...@@ -89,7 +148,7 @@ void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { ...@@ -89,7 +148,7 @@ void onDisconnectFailure(void* context, MQTTAsync_failureData* response) {
void onDisconnect(void* context, MQTTAsync_successData* response) { void onDisconnect(void* context, MQTTAsync_successData* response) {
mqttError("Successful disconnection\n"); mqttError("Successful disconnection\n");
if (mqtt_conn != NULL) { if (mqtt_conn != NULL) {
taos_close(&(mqtt_conn)); taos_close(mqtt_conn);
mqtt_conn = NULL; mqtt_conn = NULL;
} }
disc_finished = 1; disc_finished = 1;
...@@ -98,16 +157,11 @@ void onDisconnect(void* context, MQTTAsync_successData* response) { ...@@ -98,16 +157,11 @@ void onDisconnect(void* context, MQTTAsync_successData* response) {
void onSubscribe(void* context, MQTTAsync_successData* response) { void onSubscribe(void* context, MQTTAsync_successData* response) {
mqttPrint("Subscribe succeeded\n"); mqttPrint("Subscribe succeeded\n");
subscribed = 1; subscribed = 1;
} }
void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { void onSubscribeFailure(void* context, MQTTAsync_failureData* response) {
mqttError("Subscribe failed, rc %d\n", response->code); mqttError("Subscribe failed, rc %d\n", response->code);
finished = 1; finished = 1;
if (mqtt_conn != NULL) {
taos_close(mqtt_conn);
mqtt_conn = NULL;
}
} }
void onConnectFailure(void* context, MQTTAsync_failureData* response) { void onConnectFailure(void* context, MQTTAsync_failureData* response) {
...@@ -122,7 +176,7 @@ void onConnect(void* context, MQTTAsync_successData* response) { ...@@ -122,7 +176,7 @@ void onConnect(void* context, MQTTAsync_successData* response) {
mqttPrint("Successful connection\n"); mqttPrint("Successful connection\n");
mqttPrint("Subscribing to topic %s\nfor client %s using QoS%d\n\n",TOPIC, CLIENTID, QOS); mqttPrint("Subscribing to topic %s\nfor client %s using QoS%d", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe; opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure; opts.onFailure = onSubscribeFailure;
opts.context = client; opts.context = client;
...@@ -132,26 +186,29 @@ void onConnect(void* context, MQTTAsync_successData* response) { ...@@ -132,26 +186,29 @@ void onConnect(void* context, MQTTAsync_successData* response) {
} }
} }
int32_t mqttGetReqCount() { return 0; }
int32_t mqttInitSystem() { int32_t mqttInitSystem() {
int rc = 0; int rc = 0;
if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS) { if (strnlen(tsMqttBrokerAddress, 128) == 0) {
mqttError("Failed to create client, return code %d\n", rc);
rc = EXIT_FAILURE; rc = EXIT_FAILURE;
mqttError("Can't to create client, mqtt broker address is empty %d", rc);
} else { } else {
if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_create(&client, tsMqttBrokerAddress, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) !=
mqttError("Failed to set callbacks, return code %d\n", rc); MQTTASYNC_SUCCESS) {
mqttError("Failed to create client, return code %d", rc);
rc = EXIT_FAILURE; rc = EXIT_FAILURE;
} else { } else {
conn_opts.keepAliveInterval = 20; if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL)) != MQTTASYNC_SUCCESS) {
conn_opts.cleansession = 1; mqttError("Failed to set callbacks, return code %d", rc);
conn_opts.onSuccess = onConnect; rc = EXIT_FAILURE;
conn_opts.onFailure = onConnectFailure; } else {
conn_opts.context = client; conn_opts.keepAliveInterval = 20;
taos_init(); conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
taos_init();
}
} }
} }
return rc; return rc;
...@@ -161,34 +218,31 @@ int32_t mqttStartSystem() { ...@@ -161,34 +218,31 @@ int32_t mqttStartSystem() {
int rc = 0; int rc = 0;
mqttPrint("mqttStartSystem"); mqttPrint("mqttStartSystem");
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
mqttError("Failed to start connect, return code %d\n", rc); mqttError("Failed to start connect, return code %d", rc);
rc = EXIT_FAILURE; rc = EXIT_FAILURE;
} else { } else {
while (!subscribed && !finished) usleep(10000L); while (!subscribed && !finished) usleep(10000L);
disc_opts.onSuccess = onDisconnect; disc_opts.onSuccess = onDisconnect;
disc_opts.onFailure = onDisconnectFailure; disc_opts.onFailure = onDisconnectFailure;
mqttPrint("Successful started\n"); mqttPrint("Successful started\n");
} }
return rc; return rc;
} }
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) { if (code < 0) {
mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code));
taos_close(mqtt_conn); taos_close(mqtt_conn);
mqtt_conn = NULL; mqtt_conn = NULL;
return; return;
} }
mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));
} }
void mqttStopSystem() { void mqttStopSystem() {
int rc=0; int rc = 0;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
mqttError("Failed to start disconnect, return code %d\n", rc); mqttError("Failed to start disconnect, return code %d", rc);
rc = EXIT_FAILURE; rc = EXIT_FAILURE;
} else { } else {
while (!disc_finished) { while (!disc_finished) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册