diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 7f3cd46a558e3242fd2f7ac59a8141167b090d01..c7763a257a82a2cec100c850eab644a0a99a3113 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -251,8 +251,11 @@ bool taosCfgDynamicOptions(char *msg) { for (int32_t i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; - if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; + //if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; if (cfg->valType != TAOS_CFG_VTYPE_INT32) continue; + + int32_t cfgLen = strlen(cfg->option); + if (cfgLen != olen) continue; if (strncasecmp(option, cfg->option, olen) != 0) continue; *((int32_t *)cfg->ptr) = vint; diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index f4cb1a9ef3d435cd044df03fd96835043b73ff71..c03ff688d2282f931706e80331a884439d78ee04 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -581,7 +581,7 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (!sdbIsMaster()) { *secret = 0; - mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_APP_NOT_READY)); + mDebug("user:%s, failed to auth user, mnode is not master", user); return TSDB_CODE_APP_NOT_READY; } diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index d76bb4bd82f34fe151a973208916f10b1e336ace..7800ddec57f1f98bdb832e4b6f91cd9488ec507e 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -25,6 +25,7 @@ #include "tsclient.h" #include "dnode.h" #include "monitor.h" +#include "taoserror.h" #define monitorFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }} #define monitorError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }} @@ -33,129 +34,159 @@ #define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} #define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} -#define SQL_LENGTH 1024 +#define SQL_LENGTH 1030 #define LOG_LEN_STR 100 #define IP_LEN_STR TSDB_EP_LEN #define CHECK_INTERVAL 1000 typedef enum { - MONITOR_CMD_CREATE_DB, - MONITOR_CMD_CREATE_TB_LOG, - MONITOR_CMD_CREATE_MT_DN, - MONITOR_CMD_CREATE_MT_ACCT, - MONITOR_CMD_CREATE_TB_DN, - MONITOR_CMD_CREATE_TB_ACCT_ROOT, - MONITOR_CMD_CREATE_TB_SLOWQUERY, - MONITOR_CMD_MAX + MON_CMD_CREATE_DB, + MON_CMD_CREATE_TB_LOG, + MON_CMD_CREATE_MT_DN, + MON_CMD_CREATE_MT_ACCT, + MON_CMD_CREATE_TB_DN, + MON_CMD_CREATE_TB_ACCT_ROOT, + MON_CMD_CREATE_TB_SLOWQUERY, + MON_CMD_MAX } EMonitorCommand; typedef enum { - MONITOR_STATE_UN_INIT, - MONITOR_STATE_INITIALIZING, - MONITOR_STATE_INITIALIZED, - MONITOR_STATE_STOPPED + MON_STATE_NOT_INIT, + MON_STATE_INITED } EMonitorState; typedef struct { - void * conn; - void * timer; - char ep[TSDB_EP_LEN]; - int8_t cmdIndex; - int8_t state; - char sql[SQL_LENGTH + 1]; - void * initTimer; - void * diskTimer; + pthread_t thread; + void * conn; + char ep[TSDB_EP_LEN]; + int8_t cmdIndex; + int8_t state; + int8_t start; // enable/disable by mnode + int8_t quiting; // taosd is quiting + char sql[SQL_LENGTH + 1]; } SMonitorConn; -static SMonitorConn tsMonitorConn; -static void monitorInitConn(void *para, void *unused); -static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code); -static void monitorInitDatabase(); -static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code); -static void monitorStartTimer(); -static void monitorSaveSystemInfo(); +static SMonitorConn tsMonitor = {0}; +static void monitorSaveSystemInfo(); +static void *monitorThreadFunc(void *param); +static void monitorBuildMonitorSql(char *sql, int32_t cmd); extern int32_t (*monitorStartSystemFp)(); -extern void (*monitorStopSystemFp)(); -extern void (*monitorExecuteSQLFp)(char *sql); - -static void monitorCheckDiskUsage(void *para, void *unused) { - taosGetDisk(); - taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer); -} +extern void (*monitorStopSystemFp)(); +extern void (*monitorExecuteSQLFp)(char *sql); int32_t monitorInitSystem() { - taos_init(); - taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer); + if (tsMonitor.ep[0] == 0) { + strcpy(tsMonitor.ep, tsLocalEp); + } + + int len = strlen(tsMonitor.ep); + for (int i = 0; i < len; ++i) { + if (tsMonitor.ep[i] == ':' || tsMonitor.ep[i] == '-' || tsMonitor.ep[i] == '.') { + tsMonitor.ep[i] = '_'; + } + } + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) { + monitorError("failed to create thread to for monitor module, reason:%s", strerror(errno)); + return -1; + } + + pthread_attr_destroy(&thAttr); + monitorDebug("monitor thread is launched"); + monitorStartSystemFp = monitorStartSystem; monitorStopSystemFp = monitorStopSystem; return 0; } int32_t monitorStartSystem() { - monitorInfo("start monitor module"); - monitorInitSystem(); - taosTmrReset(monitorInitConn, 10, NULL, tscTmr, &tsMonitorConn.initTimer); + taos_init(); + tsMonitor.start = 1; + monitorExecuteSQLFp = monitorExecuteSQL; + monitorInfo("monitor module start"); return 0; } -static void monitorStartSystemRetry() { - if (tsMonitorConn.initTimer != NULL) { - taosTmrReset(monitorInitConn, 3000, NULL, tscTmr, &tsMonitorConn.initTimer); - } -} +static void *monitorThreadFunc(void *param) { + monitorDebug("starting to initialize monitor module ..."); -static void monitorInitConn(void *para, void *unused) { - if (dnodeGetDnodeId() <= 0) { - monitorStartSystemRetry(); - return; - } - - monitorInfo("starting to initialize monitor service .."); - tsMonitorConn.state = MONITOR_STATE_INITIALIZING; + while (1) { + if (tsMonitor.quiting) { + tsMonitor.state = MON_STATE_NOT_INIT; + monitorInfo("monitor thread will quit, for taosd is quiting"); + break; + } else { + taosGetDisk(); + } - if (tsMonitorConn.ep[0] == 0) - strcpy(tsMonitorConn.ep, tsLocalEp); + if (tsMonitor.start == 0) { + continue; + } - int len = strlen(tsMonitorConn.ep); - for (int i = 0; i < len; ++i) { - if (tsMonitorConn.ep[i] == ':' || tsMonitorConn.ep[i] == '-') { - tsMonitorConn.ep[i] = '_'; + static int32_t accessTimes = 0; + accessTimes++; + taosMsleep(1000); + + if (dnodeGetDnodeId() <= 0) { + monitorDebug("dnode not initialized, waiting for 3000 ms to start monitor module"); + continue; } - } - if (tsMonitorConn.conn == NULL) { - taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, monitorInitConnCb, &tsMonitorConn, &(tsMonitorConn.conn)); - } else { - monitorInitDatabase(); - } -} + if (tsMonitor.conn == NULL) { + tsMonitor.state = MON_STATE_NOT_INIT; + tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0); + if (tsMonitor.conn == NULL) { + monitorError("failed to connect to database, reason:%s", tstrerror(terrno)); + continue; + } else { + monitorDebug("connect to database success"); + } + } -static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code) { - // free it firstly in any cases. - taos_free_result(result); + if (tsMonitor.state == MON_STATE_NOT_INIT) { + for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) { + monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex); + void *res = taos_query(tsMonitor.conn, tsMonitor.sql); + int code = taos_errno(res); + taos_free_result(res); + + if (code != 0) { + monitorError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code)); + break; + } else { + monitorDebug("successfully to exec sql:%s", tsMonitor.sql); + } + } + + if (tsMonitor.start) { + tsMonitor.state = MON_STATE_INITED; + } + } - if (code != TSDB_CODE_SUCCESS) { - monitorError("monitor:%p, connect to database failed, reason:%s", tsMonitorConn.conn, tstrerror(code)); - taos_close(tsMonitorConn.conn); - tsMonitorConn.conn = NULL; - tsMonitorConn.state = MONITOR_STATE_UN_INIT; - monitorStartSystemRetry(); - return; + if (tsMonitor.state == MON_STATE_INITED) { + if (accessTimes % tsMonitorInterval == 0) { + monitorSaveSystemInfo(); + } + } } - monitorDebug("monitor:%p, connect to database success, reason:%s", tsMonitorConn.conn, tstrerror(code)); - monitorInitDatabase(); + monitorInfo("monitor thread is stopped"); + return NULL; } -static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { +static void monitorBuildMonitorSql(char *sql, int32_t cmd) { memset(sql, 0, SQL_LENGTH); - if (cmd == MONITOR_CMD_CREATE_DB) { + if (cmd == MON_CMD_CREATE_DB) { snprintf(sql, SQL_LENGTH, "create database if not exists %s replica 1 days 10 keep 30 cache %d " "blocks %d maxtables 16 precision 'us'", tsMonitorDbName, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MIN_TOTAL_BLOCKS); - } else if (cmd == MONITOR_CMD_CREATE_MT_DN) { + } else if (cmd == MON_CMD_CREATE_MT_DN) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn(ts timestamp" ", cpu_taosd float, cpu_system float, cpu_cores int" @@ -166,10 +197,10 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { ", req_http int, req_select int, req_insert int" ") tags (dnodeid int, fqdn binary(%d))", tsMonitorDbName, TSDB_FQDN_LEN); - } else if (cmd == MONITOR_CMD_CREATE_TB_DN) { + } else if (cmd == MON_CMD_CREATE_TB_DN) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn%d using %s.dn tags(%d, '%s')", tsMonitorDbName, dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp); - } else if (cmd == MONITOR_CMD_CREATE_MT_ACCT) { + } else if (cmd == MON_CMD_CREATE_MT_ACCT) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.acct(ts timestamp " ", currentPointsPerSecond bigint, maxPointsPerSecond bigint" @@ -185,15 +216,15 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { ", accessState smallint" ") tags (acctId binary(%d))", tsMonitorDbName, TSDB_USER_LEN); - } else if (cmd == MONITOR_CMD_CREATE_TB_ACCT_ROOT) { + } else if (cmd == MON_CMD_CREATE_TB_ACCT_ROOT) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.acct_%s using %s.acct tags('%s')", tsMonitorDbName, TSDB_DEFAULT_USER, tsMonitorDbName, TSDB_DEFAULT_USER); - } else if (cmd == MONITOR_CMD_CREATE_TB_SLOWQUERY) { + } else if (cmd == MON_CMD_CREATE_TB_SLOWQUERY) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.slowquery(ts timestamp, username " "binary(%d), created_time timestamp, time bigint, sql binary(%d))", tsMonitorDbName, TSDB_TABLE_FNAME_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN); - } else if (cmd == MONITOR_CMD_CREATE_TB_LOG) { + } else if (cmd == MON_CMD_CREATE_TB_LOG) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.log(ts timestamp, level tinyint, " "content binary(%d), ipaddr binary(%d))", @@ -203,75 +234,22 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { sql[SQL_LENGTH] = 0; } -static void monitorInitDatabase() { - if (tsMonitorConn.cmdIndex < MONITOR_CMD_MAX) { - dnodeBuildMonitorSql(tsMonitorConn.sql, tsMonitorConn.cmdIndex); - taos_query_a(tsMonitorConn.conn, tsMonitorConn.sql, monitorInitDatabaseCb, NULL); - } else { - tsMonitorConn.state = MONITOR_STATE_INITIALIZED; - monitorExecuteSQLFp = monitorExecuteSQL; - monitorInfo("monitor service init success"); - - monitorStartTimer(); - } -} - -static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code) { - if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST || code == TSDB_CODE_MND_DB_ALREADY_EXIST || code >= 0) { - monitorDebug("monitor:%p, sql success, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql); - if (tsMonitorConn.cmdIndex == MONITOR_CMD_CREATE_TB_LOG) { - monitorInfo("dnode:%s is started", tsLocalEp); - } - tsMonitorConn.cmdIndex++; - monitorInitDatabase(); - } else { - monitorError("monitor:%p, sql failed, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql); - tsMonitorConn.state = MONITOR_STATE_UN_INIT; - monitorStartSystemRetry(); - } - - taos_free_result(result); -} - void monitorStopSystem() { - if (tsMonitorConn.state == MONITOR_STATE_STOPPED) return; - tsMonitorConn.state = MONITOR_STATE_STOPPED; + tsMonitor.start = 0; + tsMonitor.state = MON_STATE_NOT_INIT; monitorExecuteSQLFp = NULL; - - monitorInfo("monitor module is stopped"); - - if (tsMonitorConn.initTimer != NULL) { - taosTmrStopA(&(tsMonitorConn.initTimer)); - } - if (tsMonitorConn.timer != NULL) { - taosTmrStopA(&(tsMonitorConn.timer)); - } - if (tsMonitorConn.conn != NULL) { - taos_close(tsMonitorConn.conn); - tsMonitorConn.conn = NULL; - } + monitorInfo("monitor module stopped"); } void monitorCleanUpSystem() { + tsMonitor.quiting = 1; monitorStopSystem(); - monitorInfo("monitor module cleanup"); -} - -static void monitorStartTimer() { - taosTmrReset(monitorSaveSystemInfo, tsMonitorInterval * 1000, NULL, tscTmr, &tsMonitorConn.timer); -} - -static void dnodeMontiorLogCallback(void *param, TAOS_RES *result, int32_t code) { - int32_t c = taos_errno(result); - - if (c != TSDB_CODE_SUCCESS) { - monitorError("monitor:%p, save %s failed, reason:%s", tsMonitorConn.conn, (char *)param, tstrerror(c)); - } else { - int32_t rows = taos_affected_rows(result); - monitorDebug("monitor:%p, save %s succ, rows:%d", tsMonitorConn.conn, (char *)param, rows); + pthread_join(tsMonitor.thread, NULL); + if (tsMonitor.conn != NULL) { + taos_close(tsMonitor.conn); + tsMonitor.conn = NULL; } - - taos_free_result(result); + monitorInfo("monitor module is cleaned up"); } // unit is MB @@ -279,13 +257,13 @@ static int32_t monitorBuildMemorySql(char *sql) { float sysMemoryUsedMB = 0; bool suc = taosGetSysMemory(&sysMemoryUsedMB); if (!suc) { - monitorError("monitor:%p, get sys memory info failed.", tsMonitorConn.conn); + monitorDebug("failed to get sys memory info"); } float procMemoryUsedMB = 0; suc = taosGetProcMemory(&procMemoryUsedMB); if (!suc) { - monitorError("monitor:%p, get proc memory info failed.", tsMonitorConn.conn); + monitorDebug("failed to get proc memory info"); } return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB); @@ -296,11 +274,11 @@ static int32_t monitorBuildCpuSql(char *sql) { float sysCpuUsage = 0, procCpuUsage = 0; bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage); if (!suc) { - monitorError("monitor:%p, get cpu usage failed.", tsMonitorConn.conn); + monitorDebug("failed to get cpu usage"); } if (sysCpuUsage <= procCpuUsage) { - sysCpuUsage = procCpuUsage + (float)0.1; + sysCpuUsage = procCpuUsage + 0.1f; } return sprintf(sql, ", %f, %f, %d", procCpuUsage, sysCpuUsage, tsNumOfCores); @@ -316,14 +294,14 @@ static int32_t monitorBuildBandSql(char *sql) { float bandSpeedKb = 0; bool suc = taosGetBandSpeed(&bandSpeedKb); if (!suc) { - monitorError("monitor:%p, get bandwidth speed failed.", tsMonitorConn.conn); + monitorDebug("failed to get bandwidth speed"); } return sprintf(sql, ", %f", bandSpeedKb); } static int32_t monitorBuildReqSql(char *sql) { - SDnodeStatisInfo info = dnodeGetStatisInfo(); + SDnodeStatisInfo info = dnodeGetStatisInfo(); return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum); } @@ -331,20 +309,15 @@ static int32_t monitorBuildIoSql(char *sql) { float readKB = 0, writeKB = 0; bool suc = taosGetProcIO(&readKB, &writeKB); if (!suc) { - monitorError("monitor:%p, get io info failed.", tsMonitorConn.conn); + monitorDebug("failed to get io info"); } return sprintf(sql, ", %f, %f", readKB, writeKB); } static void monitorSaveSystemInfo() { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) { - monitorStartTimer(); - return; - } - int64_t ts = taosGetTimestampUs(); - char * sql = tsMonitorConn.sql; + char * sql = tsMonitor.sql; int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts); pos += monitorBuildCpuSql(sql + pos); @@ -354,16 +327,31 @@ static void monitorSaveSystemInfo() { pos += monitorBuildIoSql(sql + pos); pos += monitorBuildReqSql(sql + pos); - monitorDebug("monitor:%p, save system info, sql:%s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sys"); + void *res = taos_query(tsMonitor.conn, tsMonitor.sql); + int code = taos_errno(res); + taos_free_result(res); + + if (code != 0) { + monitorError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql); + } else { + monitorDebug("successfully to save system info, sql:%s", tsMonitor.sql); + } +} - if (tsMonitorConn.timer != NULL && tsMonitorConn.state != MONITOR_STATE_STOPPED) { - monitorStartTimer(); +static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) { + int32_t c = taos_errno(result); + if (c != TSDB_CODE_SUCCESS) { + monitorError("save %s failed, reason:%s", (char *)param, tstrerror(c)); + } else { + int32_t rows = taos_affected_rows(result); + monitorDebug("save %s succ, rows:%d", (char *)param, rows); } + + taos_free_result(result); } void monitorSaveAcctLog(SAcctMonitorObj *pMon) { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; + if (tsMonitor.state != MON_STATE_INITED) return; char sql[1024] = {0}; sprintf(sql, @@ -392,19 +380,16 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) { pMon->totalConns, pMon->maxConns, pMon->accessState); - monitorDebug("monitor:%p, save account info, sql %s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "account"); + monitorDebug("save account info, sql:%s", sql); + taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info"); } void monitorSaveLog(int32_t level, const char *const format, ...) { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; + if (tsMonitor.state != MON_STATE_INITED) return; va_list argpointer; char sql[SQL_LENGTH] = {0}; int32_t max_length = SQL_LENGTH - 30; - - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; - int32_t len = snprintf(sql, (size_t)max_length, "insert into %s.log values(%" PRId64 ", %d,'", tsMonitorDbName, taosGetTimestampUs(), level); @@ -416,12 +401,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) { len += sprintf(sql + len, "', '%s')", tsLocalEp); sql[len++] = 0; - monitorDebug("monitor:%p, save log, sql: %s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "log"); + monitorDebug("save log, sql: %s", sql); + taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log"); } void monitorExecuteSQL(char *sql) { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; - monitorDebug("monitor:%p, execute sql: %s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sql"); + if (tsMonitor.state != MON_STATE_INITED) return; + + monitorDebug("execute sql:%s", sql); + taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql"); } diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 8079cedb274261f7c24e65c4c653d4906bc21b3b..eacc3b1f74cdfc0d00bb53a38beb8c85b164e200 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -23,12 +23,13 @@ #include "posix_sockets.h" #include "taos.h" #include "tglobal.h" +#include "taoserror.h" -struct mqtt_client tsMqttClient = {0}; struct SMqttReconnectState tsMqttStatus = {0}; -static pthread_t tsMqttClientDaemonThread = {0}; -static void* tsMqttConnect = NULL; -static bool tsMqttIsRuning = false; +struct mqtt_client tsMqttClient = {0}; +static pthread_t tsMqttClientDaemonThread = {0}; +static void* tsMqttConnect = NULL; +static bool tsMqttIsRuning = false; int32_t mqttInitSystem() { return 0; } @@ -69,32 +70,32 @@ void mqttCleanUpSystem() { void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) { const char* content = published->application_message; - mqttDebug("receive message size:%d", (int)published->application_message_size); + mqttDebug("receive mqtt message, size:%d", (int)published->application_message_size); if (tsMqttConnect == NULL) { tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0); if (tsMqttConnect == NULL) { - mqttError("failed to connect to tdengine"); + mqttError("failed to connect to tdengine, reason:%s", tstrerror(terrno)); return; } else { mqttInfo("successfully connected to the tdengine"); } } - mqttTrace("receive message content:%s", content); + mqttTrace("receive mqtt message, content:%s", content); char* sql = mqttConverJsonToSql((char*)content, (int)published->application_message_size); if (sql != NULL) { void* res = taos_query(tsMqttConnect, sql); int code = taos_errno(res); if (code != 0) { - mqttError("failed to exec sql:%s", sql); + mqttError("failed to exec sql, reason:%s sql:%s", tstrerror(code), sql); } else { - mqttDebug("successfully to exec sql:%s", sql); + mqttTrace("successfully to exec sql:%s", sql); } taos_free_result(res); } else { - mqttDebug("failed to parse mqtt message"); + mqttError("failed to parse mqtt message"); } } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 40598332d998191402cc411e0c6156f302d99267..88a844333beb70a9737a9464b4a97efeae3b130c 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -293,6 +293,7 @@ cd ../../../debug; make ./test.sh -f unique/stable/replica3_dnode6.sim ./test.sh -f unique/stable/replica3_vnode3.sim +./test.sh -f unique/mnode/mgmt20.sim ./test.sh -f unique/mnode/mgmt21.sim ./test.sh -f unique/mnode/mgmt22.sim ./test.sh -f unique/mnode/mgmt23.sim diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 498f5367261c395be3ef1c508a8dc14700a7f8d1..0d444a5a6ed25329b857422cd8eb8bd06af87530 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -119,7 +119,7 @@ echo "tsdbDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG echo "jnidebugFlag 135" >> $TAOS_CFG echo "odbcdebugFlag 135" >> $TAOS_CFG -echo "httpDebugFlag 143" >> $TAOS_CFG +echo "httpDebugFlag 135" >> $TAOS_CFG echo "monitorDebugFlag 135" >> $TAOS_CFG echo "mqttDebugFlag 135" >> $TAOS_CFG echo "qdebugFlag 135" >> $TAOS_CFG diff --git a/tests/script/unique/mnode/mgmt20.sim b/tests/script/unique/mnode/mgmt20.sim new file mode 100644 index 0000000000000000000000000000000000000000..d69e377955f55141eee39c222fa2d26bd48f75dd --- /dev/null +++ b/tests/script/unique/mnode/mgmt20.sim @@ -0,0 +1,71 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2 + +system sh/cfg.sh -n dnode1 -c monitor -v 1 +system sh/cfg.sh -n dnode2 -c monitor -v 1 + +print ============== step1 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sleep 3000 +sql connect + +print ============== step2 +sql create dnode $hostname2 + +$x = 0 +show2: + $x = $x + 1 + sleep 2000 + if $x == 10 then + return -1 + endi + +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +if $data2_1 != master then + goto show2 +endi +if $data2_2 != slave then + goto show2 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +print ============== step3 +system sh/exec.sh -n dnode2 -s start +sleep 10000 + +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== step4 +sql select count(*) from log.dn1 +$d1_first = $rows +sql select count(*) from log.dn2 +$d2_first = $rows + +sleep 3000 +sql select count(*) from log.dn1 +$d1_second = $rows +sql select count(*) from log.dn2 +$d2_second = $rows + +print dnode1 $d1_first $d1_second +print dnode2 $d2_first $d2_first +if $d1_first >= $d1_second then + return -1 +endi + +if $d2_first >= $d2_first then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file