提交 adb6249a 编写于 作者: 麦壳饼's avatar 麦壳饼
上级 b8a96b40
...@@ -64,3 +64,4 @@ CMakeError.log ...@@ -64,3 +64,4 @@ CMakeError.log
/out/isenseconfig/WSL-Clang-Debug /out/isenseconfig/WSL-Clang-Debug
/out/isenseconfig/WSL-GCC-Debug /out/isenseconfig/WSL-GCC-Debug
/test/cfg /test/cfg
/src/.vs
...@@ -94,7 +94,10 @@ extern int32_t tsMaxTables; ...@@ -94,7 +94,10 @@ extern int32_t tsMaxTables;
extern char tsDefaultDB[]; extern char tsDefaultDB[];
extern char tsDefaultUser[]; extern char tsDefaultUser[];
extern char tsDefaultPass[]; extern char tsDefaultPass[];
extern char tsMqttBrokerAddress[]; extern char tsMqttBrokerAddress[];
extern char tsMqttBrokerClientId[];
extern int32_t tsMaxMeterConnections; extern int32_t tsMaxMeterConnections;
extern int32_t tsMaxVnodeConnections; extern int32_t tsMaxVnodeConnections;
extern int32_t tsMaxMgmtConnections; extern int32_t tsMaxMgmtConnections;
......
...@@ -202,7 +202,7 @@ char tsTimezone[64] = {0}; ...@@ -202,7 +202,7 @@ char tsTimezone[64] = {0};
char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
char tsMqttBrokerAddress[128] = {0}; char tsMqttBrokerAddress[128] = {0};
char tsMqttBrokerClientId[128] = {0};
int32_t tsMaxBinaryDisplayWidth = 30; int32_t tsMaxBinaryDisplayWidth = 30;
...@@ -742,6 +742,16 @@ static void doInitGlobalConfig() { ...@@ -742,6 +742,16 @@ static void doInitGlobalConfig() {
cfg.ptrLength = 126; cfg.ptrLength = 126;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "mqttBrokerClientId";
cfg.ptr = tsMqttBrokerClientId;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 126;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// socket type; udp by default // socket type; udp by default
cfg.option = "sockettype"; cfg.option = "sockettype";
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
extern "C" { extern "C" {
#endif #endif
/** /**
* @file * @file
* A simple subscriber program that performs automatic reconnections. * A simple subscriber program that performs automatic reconnections.
...@@ -38,13 +37,16 @@ extern "C" { ...@@ -38,13 +37,16 @@ extern "C" {
* \ref mqttReconnectClient is called, this instance will be passed. * \ref mqttReconnectClient is called, this instance will be passed.
*/ */
struct reconnect_state_t { struct reconnect_state_t {
const char* hostname; char* hostname;
const char* port; char* port;
const char* topic; char* topic;
uint8_t* sendbuf; char* client_id;
size_t sendbufsz; char* user_name;
uint8_t* recvbuf; char* password;
size_t recvbufsz; uint8_t* sendbuf;
size_t sendbufsz;
uint8_t* recvbuf;
size_t recvbufsz;
}; };
/** /**
...@@ -76,7 +78,7 @@ void mqttCleanup(int status, int sockfd, pthread_t* client_daemon); ...@@ -76,7 +78,7 @@ void mqttCleanup(int status, int sockfd, pthread_t* client_daemon);
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code); void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code);
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code);
#define CLIENTID "taos" #define CLIENTID "taos"
#define TOPIC "/taos/+/+/+/" // taos/<token>/<db name>/<table name>/ #define TOPIC "+/+/+/" // path/<token>/<db name>/<table name>/
#define PAYLOAD "Hello World!" #define PAYLOAD "Hello World!"
#define QOS 1 #define QOS 1
#define TIMEOUT 10000L #define TIMEOUT 10000L
......
...@@ -33,32 +33,54 @@ ...@@ -33,32 +33,54 @@
struct mqtt_client client; struct mqtt_client client;
pthread_t client_daemon; pthread_t client_daemon;
void* mqtt_conn; void* mqtt_conn;
struct reconnect_state_t reconnect_state; struct reconnect_state_t recnt_status;
char* topicPath;
int32_t mqttInitSystem() { int32_t mqttInitSystem() {
int rc = 0; int rc = 0;
const char* addr;
const char* port;
addr = tsMqttBrokerAddress;
port = "1883";
reconnect_state.hostname = addr;
reconnect_state.port = port;
reconnect_state.topic = TOPIC;
uint8_t sendbuf[2048]; uint8_t sendbuf[2048];
uint8_t recvbuf[1024]; uint8_t recvbuf[1024];
reconnect_state.sendbuf = sendbuf; recnt_status.sendbuf = sendbuf;
reconnect_state.sendbufsz = sizeof(sendbuf); recnt_status.sendbufsz = sizeof(sendbuf);
reconnect_state.recvbuf = recvbuf; recnt_status.recvbuf = recvbuf;
reconnect_state.recvbufsz = sizeof(recvbuf); recnt_status.recvbufsz = sizeof(recvbuf);
char* url = tsMqttBrokerAddress;
recnt_status.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL;
recnt_status.password = strstr(url, "@") != NULL ? strbetween(strstr(url, recnt_status.user_name), ":", "@") : NULL;
if (strstr(url, "@") != NULL) {
recnt_status.hostname = strbetween(url, "@", ":");
} else if (strstr(strstr(url, "://") + 3, ":") != NULL) {
recnt_status.hostname = strbetween(url, "//", ":");
} else {
recnt_status.hostname = strbetween(url, "//", "/");
}
char* _begin_hostname = strstr(url, recnt_status.hostname);
if (strstr(_begin_hostname, ":") != NULL) {
recnt_status.port = strbetween(_begin_hostname, ":", "/");
} else {
recnt_status.port = strbetween("'1883'", "'", "'");
}
topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recnt_status.port : recnt_status.hostname),
"/", "/");
int _tpsize = strlen(topicPath) + strlen(TOPIC) + 1;
recnt_status.topic = calloc(1, _tpsize);
snprintf(recnt_status.topic, _tpsize-1, "/%s/" TOPIC, topicPath);
recnt_status.client_id = tsMqttBrokerClientId==NULL || strlen(tsMqttBrokerClientId)<3? tsMqttBrokerClientId:"taos_mqtt";
taos_init(); taos_init();
mqttPrint("mqttInitSystem %s", tsMqttBrokerAddress); mqttPrint("mqttInitSystem mqtt://%s:%s@%s:%s/%s/", recnt_status.user_name, recnt_status.password,
recnt_status.hostname, recnt_status.port, topicPath);
return rc; return rc;
} }
int32_t mqttStartSystem() { int32_t mqttStartSystem() {
int rc = 0; int rc = 0;
mqtt_conn = NULL; mqtt_conn = NULL;
mqtt_init_reconnect(&client, mqttReconnectClient, &reconnect_state, mqtt_PublishCallback); mqtt_init_reconnect(&client, mqttReconnectClient, &recnt_status, mqtt_PublishCallback);
if (pthread_create(&client_daemon, NULL, mqttClientRefresher, &client)) { if (pthread_create(&client_daemon, NULL, mqttClientRefresher, &client)) {
mqttError("Failed to start client daemon."); mqttError("Failed to start client daemon.");
mqttCleanup(EXIT_FAILURE, -1, NULL); mqttCleanup(EXIT_FAILURE, -1, NULL);
...@@ -77,6 +99,12 @@ void mqttCleanUpSystem() { ...@@ -77,6 +99,12 @@ void mqttCleanUpSystem() {
mqttPrint("mqttCleanUpSystem"); mqttPrint("mqttCleanUpSystem");
mqttCleanup(EXIT_SUCCESS, client.socketfd, &client_daemon); mqttCleanup(EXIT_SUCCESS, client.socketfd, &client_daemon);
taos_cleanup(mqtt_conn); taos_cleanup(mqtt_conn);
free(recnt_status.user_name);
free(recnt_status.password);
free(recnt_status.hostname);
free(recnt_status.port);
free(recnt_status.topic);
free(topicPath);
} }
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) { void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) {
...@@ -178,12 +206,10 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr ...@@ -178,12 +206,10 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr
mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf, mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf,
reconnect_state->recvbufsz); reconnect_state->recvbufsz);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */ /* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION; uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */ /* Send connection request to the broker. */
mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400); mqtt_connect(client, reconnect_state->client_id, NULL, NULL, 0, reconnect_state->user_name, reconnect_state->password,connect_flags, 400);
/* Subscribe to the topic. */ /* Subscribe to the topic. */
mqtt_subscribe(client, reconnect_state->topic, 0); mqtt_subscribe(client, reconnect_state->topic, 0);
......
...@@ -124,6 +124,8 @@ int64_t strnatoi(char *num, int32_t len); ...@@ -124,6 +124,8 @@ int64_t strnatoi(char *num, int32_t len);
char* strreplace(const char* str, const char* pattern, const char* rep); char* strreplace(const char* str, const char* pattern, const char* rep);
char *strbetween(char *string, char *begin, char *end);
char *paGetToken(char *src, char **token, int32_t *tokenLen); char *paGetToken(char *src, char **token, int32_t *tokenLen);
void taosMsleep(int32_t mseconds); void taosMsleep(int32_t mseconds);
......
...@@ -331,6 +331,20 @@ char *strreplace(const char *str, const char *pattern, const char *rep) { ...@@ -331,6 +331,20 @@ char *strreplace(const char *str, const char *pattern, const char *rep) {
return dest; return dest;
} }
char *strbetween(char *string, char *begin, char *end) {
char *result = NULL;
char *_begin = strstr(string, begin);
if (_begin != NULL) {
char *_end = strstr(_begin + strlen(begin), end);
int size = _end - _begin;
if (_end != NULL && size > 0) {
result = (char *)calloc(1, size);
memcpy(result, _begin + strlen(begin), size - +strlen(begin));
}
}
return result;
}
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]) { int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]) {
int32_t i; int32_t i;
char hexval[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; char hexval[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
...@@ -691,4 +705,4 @@ void taosRemoveDir(char *rootDir) { ...@@ -691,4 +705,4 @@ void taosRemoveDir(char *rootDir) {
rmdir(rootDir); rmdir(rootDir);
uPrint("dir:%s is removed", rootDir); uPrint("dir:%s is removed", rootDir);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册