提交 25c0b8cb 编写于 作者: S Shengliang Guan

[TD-1293 ]<fix>: monitor uses too many resources and may cause auth failure

上级 189c3034
......@@ -251,8 +251,11 @@ bool taosCfgDynamicOptions(char *msg) {
for (int32_t i = 0; i < tsGlobalConfigNum; ++i) {
SGlobalCfg *cfg = tsGlobalConfig + i;
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
//if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
if (cfg->valType != TAOS_CFG_VTYPE_INT32) continue;
int32_t cfgLen = strlen(cfg->option);
if (cfgLen != olen) continue;
if (strncasecmp(option, cfg->option, olen) != 0) continue;
*((int32_t *)cfg->ptr) = vint;
......
......@@ -581,7 +581,7 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (!sdbIsMaster()) {
*secret = 0;
mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_APP_NOT_READY));
mDebug("user:%s, failed to auth user, mnode is not master", user);
return TSDB_CODE_APP_NOT_READY;
}
......
......@@ -25,6 +25,7 @@
#include "tsclient.h"
#include "dnode.h"
#include "monitor.h"
#include "taoserror.h"
#define monitorFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
#define monitorError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
......@@ -33,129 +34,159 @@
#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1024
#define SQL_LENGTH 1030
#define LOG_LEN_STR 100
#define IP_LEN_STR TSDB_EP_LEN
#define CHECK_INTERVAL 1000
typedef enum {
MONITOR_CMD_CREATE_DB,
MONITOR_CMD_CREATE_TB_LOG,
MONITOR_CMD_CREATE_MT_DN,
MONITOR_CMD_CREATE_MT_ACCT,
MONITOR_CMD_CREATE_TB_DN,
MONITOR_CMD_CREATE_TB_ACCT_ROOT,
MONITOR_CMD_CREATE_TB_SLOWQUERY,
MONITOR_CMD_MAX
MON_CMD_CREATE_DB,
MON_CMD_CREATE_TB_LOG,
MON_CMD_CREATE_MT_DN,
MON_CMD_CREATE_MT_ACCT,
MON_CMD_CREATE_TB_DN,
MON_CMD_CREATE_TB_ACCT_ROOT,
MON_CMD_CREATE_TB_SLOWQUERY,
MON_CMD_MAX
} EMonitorCommand;
typedef enum {
MONITOR_STATE_UN_INIT,
MONITOR_STATE_INITIALIZING,
MONITOR_STATE_INITIALIZED,
MONITOR_STATE_STOPPED
MON_STATE_NOT_INIT,
MON_STATE_INITED
} EMonitorState;
typedef struct {
void * conn;
void * timer;
char ep[TSDB_EP_LEN];
int8_t cmdIndex;
int8_t state;
char sql[SQL_LENGTH + 1];
void * initTimer;
void * diskTimer;
pthread_t thread;
void * conn;
char ep[TSDB_EP_LEN];
int8_t cmdIndex;
int8_t state;
int8_t start; // enable/disable by mnode
int8_t quiting; // taosd is quiting
char sql[SQL_LENGTH + 1];
} SMonitorConn;
static SMonitorConn tsMonitorConn;
static void monitorInitConn(void *para, void *unused);
static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code);
static void monitorInitDatabase();
static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code);
static void monitorStartTimer();
static void monitorSaveSystemInfo();
static SMonitorConn tsMonitor = {0};
static void monitorSaveSystemInfo();
static void *monitorThreadFunc(void *param);
static void monitorBuildMonitorSql(char *sql, int32_t cmd);
extern int32_t (*monitorStartSystemFp)();
extern void (*monitorStopSystemFp)();
extern void (*monitorExecuteSQLFp)(char *sql);
static void monitorCheckDiskUsage(void *para, void *unused) {
taosGetDisk();
taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer);
}
extern void (*monitorStopSystemFp)();
extern void (*monitorExecuteSQLFp)(char *sql);
int32_t monitorInitSystem() {
taos_init();
taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer);
if (tsMonitor.ep[0] == 0) {
strcpy(tsMonitor.ep, tsLocalEp);
}
int len = strlen(tsMonitor.ep);
for (int i = 0; i < len; ++i) {
if (tsMonitor.ep[i] == ':' || tsMonitor.ep[i] == '-' || tsMonitor.ep[i] == '.') {
tsMonitor.ep[i] = '_';
}
}
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) {
monitorError("failed to create thread to for monitor module, reason:%s", strerror(errno));
return -1;
}
pthread_attr_destroy(&thAttr);
monitorDebug("monitor thread is launched");
monitorStartSystemFp = monitorStartSystem;
monitorStopSystemFp = monitorStopSystem;
return 0;
}
int32_t monitorStartSystem() {
monitorInfo("start monitor module");
monitorInitSystem();
taosTmrReset(monitorInitConn, 10, NULL, tscTmr, &tsMonitorConn.initTimer);
taos_init();
tsMonitor.start = 1;
monitorExecuteSQLFp = monitorExecuteSQL;
monitorInfo("monitor module start");
return 0;
}
static void monitorStartSystemRetry() {
if (tsMonitorConn.initTimer != NULL) {
taosTmrReset(monitorInitConn, 3000, NULL, tscTmr, &tsMonitorConn.initTimer);
}
}
static void *monitorThreadFunc(void *param) {
monitorDebug("starting to initialize monitor module ...");
static void monitorInitConn(void *para, void *unused) {
if (dnodeGetDnodeId() <= 0) {
monitorStartSystemRetry();
return;
}
monitorInfo("starting to initialize monitor service ..");
tsMonitorConn.state = MONITOR_STATE_INITIALIZING;
while (1) {
if (tsMonitor.quiting) {
tsMonitor.state = MON_STATE_NOT_INIT;
monitorInfo("monitor thread will quit, for taosd is quiting");
break;
} else {
taosGetDisk();
}
if (tsMonitorConn.ep[0] == 0)
strcpy(tsMonitorConn.ep, tsLocalEp);
if (tsMonitor.start == 0) {
continue;
}
int len = strlen(tsMonitorConn.ep);
for (int i = 0; i < len; ++i) {
if (tsMonitorConn.ep[i] == ':' || tsMonitorConn.ep[i] == '-') {
tsMonitorConn.ep[i] = '_';
static int32_t accessTimes = 0;
accessTimes++;
taosMsleep(1000);
if (dnodeGetDnodeId() <= 0) {
monitorDebug("dnode not initialized, waiting for 3000 ms to start monitor module");
continue;
}
}
if (tsMonitorConn.conn == NULL) {
taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, monitorInitConnCb, &tsMonitorConn, &(tsMonitorConn.conn));
} else {
monitorInitDatabase();
}
}
if (tsMonitor.conn == NULL) {
tsMonitor.state = MON_STATE_NOT_INIT;
tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0);
if (tsMonitor.conn == NULL) {
monitorError("failed to connect to database, reason:%s", tstrerror(terrno));
continue;
} else {
monitorDebug("connect to database success");
}
}
static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code) {
// free it firstly in any cases.
taos_free_result(result);
if (tsMonitor.state == MON_STATE_NOT_INIT) {
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
monitorError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code));
break;
} else {
monitorDebug("successfully to exec sql:%s", tsMonitor.sql);
}
}
if (tsMonitor.start) {
tsMonitor.state = MON_STATE_INITED;
}
}
if (code != TSDB_CODE_SUCCESS) {
monitorError("monitor:%p, connect to database failed, reason:%s", tsMonitorConn.conn, tstrerror(code));
taos_close(tsMonitorConn.conn);
tsMonitorConn.conn = NULL;
tsMonitorConn.state = MONITOR_STATE_UN_INIT;
monitorStartSystemRetry();
return;
if (tsMonitor.state == MON_STATE_INITED) {
if (accessTimes % tsMonitorInterval == 0) {
monitorSaveSystemInfo();
}
}
}
monitorDebug("monitor:%p, connect to database success, reason:%s", tsMonitorConn.conn, tstrerror(code));
monitorInitDatabase();
monitorInfo("monitor thread is stopped");
return NULL;
}
static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
static void monitorBuildMonitorSql(char *sql, int32_t cmd) {
memset(sql, 0, SQL_LENGTH);
if (cmd == MONITOR_CMD_CREATE_DB) {
if (cmd == MON_CMD_CREATE_DB) {
snprintf(sql, SQL_LENGTH,
"create database if not exists %s replica 1 days 10 keep 30 cache %d "
"blocks %d maxtables 16 precision 'us'",
tsMonitorDbName, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MIN_TOTAL_BLOCKS);
} else if (cmd == MONITOR_CMD_CREATE_MT_DN) {
} else if (cmd == MON_CMD_CREATE_MT_DN) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.dn(ts timestamp"
", cpu_taosd float, cpu_system float, cpu_cores int"
......@@ -166,10 +197,10 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
", req_http int, req_select int, req_insert int"
") tags (dnodeid int, fqdn binary(%d))",
tsMonitorDbName, TSDB_FQDN_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_DN) {
} else if (cmd == MON_CMD_CREATE_TB_DN) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn%d using %s.dn tags(%d, '%s')", tsMonitorDbName,
dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp);
} else if (cmd == MONITOR_CMD_CREATE_MT_ACCT) {
} else if (cmd == MON_CMD_CREATE_MT_ACCT) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.acct(ts timestamp "
", currentPointsPerSecond bigint, maxPointsPerSecond bigint"
......@@ -185,15 +216,15 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
", accessState smallint"
") tags (acctId binary(%d))",
tsMonitorDbName, TSDB_USER_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_ACCT_ROOT) {
} else if (cmd == MON_CMD_CREATE_TB_ACCT_ROOT) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.acct_%s using %s.acct tags('%s')", tsMonitorDbName, TSDB_DEFAULT_USER,
tsMonitorDbName, TSDB_DEFAULT_USER);
} else if (cmd == MONITOR_CMD_CREATE_TB_SLOWQUERY) {
} else if (cmd == MON_CMD_CREATE_TB_SLOWQUERY) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.slowquery(ts timestamp, username "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))",
tsMonitorDbName, TSDB_TABLE_FNAME_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_LOG) {
} else if (cmd == MON_CMD_CREATE_TB_LOG) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.log(ts timestamp, level tinyint, "
"content binary(%d), ipaddr binary(%d))",
......@@ -203,75 +234,22 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
sql[SQL_LENGTH] = 0;
}
static void monitorInitDatabase() {
if (tsMonitorConn.cmdIndex < MONITOR_CMD_MAX) {
dnodeBuildMonitorSql(tsMonitorConn.sql, tsMonitorConn.cmdIndex);
taos_query_a(tsMonitorConn.conn, tsMonitorConn.sql, monitorInitDatabaseCb, NULL);
} else {
tsMonitorConn.state = MONITOR_STATE_INITIALIZED;
monitorExecuteSQLFp = monitorExecuteSQL;
monitorInfo("monitor service init success");
monitorStartTimer();
}
}
static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code) {
if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST || code == TSDB_CODE_MND_DB_ALREADY_EXIST || code >= 0) {
monitorDebug("monitor:%p, sql success, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql);
if (tsMonitorConn.cmdIndex == MONITOR_CMD_CREATE_TB_LOG) {
monitorInfo("dnode:%s is started", tsLocalEp);
}
tsMonitorConn.cmdIndex++;
monitorInitDatabase();
} else {
monitorError("monitor:%p, sql failed, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql);
tsMonitorConn.state = MONITOR_STATE_UN_INIT;
monitorStartSystemRetry();
}
taos_free_result(result);
}
void monitorStopSystem() {
if (tsMonitorConn.state == MONITOR_STATE_STOPPED) return;
tsMonitorConn.state = MONITOR_STATE_STOPPED;
tsMonitor.start = 0;
tsMonitor.state = MON_STATE_NOT_INIT;
monitorExecuteSQLFp = NULL;
monitorInfo("monitor module is stopped");
if (tsMonitorConn.initTimer != NULL) {
taosTmrStopA(&(tsMonitorConn.initTimer));
}
if (tsMonitorConn.timer != NULL) {
taosTmrStopA(&(tsMonitorConn.timer));
}
if (tsMonitorConn.conn != NULL) {
taos_close(tsMonitorConn.conn);
tsMonitorConn.conn = NULL;
}
monitorInfo("monitor module stopped");
}
void monitorCleanUpSystem() {
tsMonitor.quiting = 1;
monitorStopSystem();
monitorInfo("monitor module cleanup");
}
static void monitorStartTimer() {
taosTmrReset(monitorSaveSystemInfo, tsMonitorInterval * 1000, NULL, tscTmr, &tsMonitorConn.timer);
}
static void dnodeMontiorLogCallback(void *param, TAOS_RES *result, int32_t code) {
int32_t c = taos_errno(result);
if (c != TSDB_CODE_SUCCESS) {
monitorError("monitor:%p, save %s failed, reason:%s", tsMonitorConn.conn, (char *)param, tstrerror(c));
} else {
int32_t rows = taos_affected_rows(result);
monitorDebug("monitor:%p, save %s succ, rows:%d", tsMonitorConn.conn, (char *)param, rows);
pthread_join(tsMonitor.thread, NULL);
if (tsMonitor.conn != NULL) {
taos_close(tsMonitor.conn);
tsMonitor.conn = NULL;
}
taos_free_result(result);
monitorInfo("monitor module is cleaned up");
}
// unit is MB
......@@ -279,13 +257,13 @@ static int32_t monitorBuildMemorySql(char *sql) {
float sysMemoryUsedMB = 0;
bool suc = taosGetSysMemory(&sysMemoryUsedMB);
if (!suc) {
monitorError("monitor:%p, get sys memory info failed.", tsMonitorConn.conn);
monitorDebug("failed to get sys memory info");
}
float procMemoryUsedMB = 0;
suc = taosGetProcMemory(&procMemoryUsedMB);
if (!suc) {
monitorError("monitor:%p, get proc memory info failed.", tsMonitorConn.conn);
monitorDebug("failed to get proc memory info");
}
return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB);
......@@ -296,11 +274,11 @@ static int32_t monitorBuildCpuSql(char *sql) {
float sysCpuUsage = 0, procCpuUsage = 0;
bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
if (!suc) {
monitorError("monitor:%p, get cpu usage failed.", tsMonitorConn.conn);
monitorDebug("failed to get cpu usage");
}
if (sysCpuUsage <= procCpuUsage) {
sysCpuUsage = procCpuUsage + (float)0.1;
sysCpuUsage = procCpuUsage + 0.1f;
}
return sprintf(sql, ", %f, %f, %d", procCpuUsage, sysCpuUsage, tsNumOfCores);
......@@ -316,14 +294,14 @@ static int32_t monitorBuildBandSql(char *sql) {
float bandSpeedKb = 0;
bool suc = taosGetBandSpeed(&bandSpeedKb);
if (!suc) {
monitorError("monitor:%p, get bandwidth speed failed.", tsMonitorConn.conn);
monitorDebug("failed to get bandwidth speed");
}
return sprintf(sql, ", %f", bandSpeedKb);
}
static int32_t monitorBuildReqSql(char *sql) {
SDnodeStatisInfo info = dnodeGetStatisInfo();
SDnodeStatisInfo info = dnodeGetStatisInfo();
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum);
}
......@@ -331,20 +309,15 @@ static int32_t monitorBuildIoSql(char *sql) {
float readKB = 0, writeKB = 0;
bool suc = taosGetProcIO(&readKB, &writeKB);
if (!suc) {
monitorError("monitor:%p, get io info failed.", tsMonitorConn.conn);
monitorDebug("failed to get io info");
}
return sprintf(sql, ", %f, %f", readKB, writeKB);
}
static void monitorSaveSystemInfo() {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) {
monitorStartTimer();
return;
}
int64_t ts = taosGetTimestampUs();
char * sql = tsMonitorConn.sql;
char * sql = tsMonitor.sql;
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts);
pos += monitorBuildCpuSql(sql + pos);
......@@ -354,16 +327,31 @@ static void monitorSaveSystemInfo() {
pos += monitorBuildIoSql(sql + pos);
pos += monitorBuildReqSql(sql + pos);
monitorDebug("monitor:%p, save system info, sql:%s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sys");
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
monitorError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql);
} else {
monitorDebug("successfully to save system info, sql:%s", tsMonitor.sql);
}
}
if (tsMonitorConn.timer != NULL && tsMonitorConn.state != MONITOR_STATE_STOPPED) {
monitorStartTimer();
static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
int32_t c = taos_errno(result);
if (c != TSDB_CODE_SUCCESS) {
monitorError("save %s failed, reason:%s", (char *)param, tstrerror(c));
} else {
int32_t rows = taos_affected_rows(result);
monitorDebug("save %s succ, rows:%d", (char *)param, rows);
}
taos_free_result(result);
}
void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
if (tsMonitor.state != MON_STATE_INITED) return;
char sql[1024] = {0};
sprintf(sql,
......@@ -392,19 +380,16 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
pMon->totalConns, pMon->maxConns,
pMon->accessState);
monitorDebug("monitor:%p, save account info, sql %s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "account");
monitorDebug("save account info, sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info");
}
void monitorSaveLog(int32_t level, const char *const format, ...) {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
if (tsMonitor.state != MON_STATE_INITED) return;
va_list argpointer;
char sql[SQL_LENGTH] = {0};
int32_t max_length = SQL_LENGTH - 30;
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
int32_t len = snprintf(sql, (size_t)max_length, "insert into %s.log values(%" PRId64 ", %d,'", tsMonitorDbName,
taosGetTimestampUs(), level);
......@@ -416,12 +401,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) {
len += sprintf(sql + len, "', '%s')", tsLocalEp);
sql[len++] = 0;
monitorDebug("monitor:%p, save log, sql: %s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "log");
monitorDebug("save log, sql: %s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log");
}
void monitorExecuteSQL(char *sql) {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
monitorDebug("monitor:%p, execute sql: %s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sql");
if (tsMonitor.state != MON_STATE_INITED) return;
monitorDebug("execute sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql");
}
......@@ -23,12 +23,13 @@
#include "posix_sockets.h"
#include "taos.h"
#include "tglobal.h"
#include "taoserror.h"
struct mqtt_client tsMqttClient = {0};
struct SMqttReconnectState tsMqttStatus = {0};
static pthread_t tsMqttClientDaemonThread = {0};
static void* tsMqttConnect = NULL;
static bool tsMqttIsRuning = false;
struct mqtt_client tsMqttClient = {0};
static pthread_t tsMqttClientDaemonThread = {0};
static void* tsMqttConnect = NULL;
static bool tsMqttIsRuning = false;
int32_t mqttInitSystem() { return 0; }
......@@ -69,32 +70,32 @@ void mqttCleanUpSystem() {
void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) {
const char* content = published->application_message;
mqttDebug("receive message size:%d", (int)published->application_message_size);
mqttDebug("receive mqtt message, size:%d", (int)published->application_message_size);
if (tsMqttConnect == NULL) {
tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0);
if (tsMqttConnect == NULL) {
mqttError("failed to connect to tdengine");
mqttError("failed to connect to tdengine, reason:%s", tstrerror(terrno));
return;
} else {
mqttInfo("successfully connected to the tdengine");
}
}
mqttTrace("receive message content:%s", content);
mqttTrace("receive mqtt message, content:%s", content);
char* sql = mqttConverJsonToSql((char*)content, (int)published->application_message_size);
if (sql != NULL) {
void* res = taos_query(tsMqttConnect, sql);
int code = taos_errno(res);
if (code != 0) {
mqttError("failed to exec sql:%s", sql);
mqttError("failed to exec sql, reason:%s sql:%s", tstrerror(code), sql);
} else {
mqttDebug("successfully to exec sql:%s", sql);
mqttTrace("successfully to exec sql:%s", sql);
}
taos_free_result(res);
} else {
mqttDebug("failed to parse mqtt message");
mqttError("failed to parse mqtt message");
}
}
......
......@@ -293,6 +293,7 @@ cd ../../../debug; make
./test.sh -f unique/stable/replica3_dnode6.sim
./test.sh -f unique/stable/replica3_vnode3.sim
./test.sh -f unique/mnode/mgmt20.sim
./test.sh -f unique/mnode/mgmt21.sim
./test.sh -f unique/mnode/mgmt22.sim
./test.sh -f unique/mnode/mgmt23.sim
......
......@@ -119,7 +119,7 @@ echo "tsdbDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "mqttDebugFlag 135" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode1 -c monitor -v 1
system sh/cfg.sh -n dnode2 -c monitor -v 1
print ============== step1
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 3000
sql connect
print ============== step2
sql create dnode $hostname2
$x = 0
show2:
$x = $x + 1
sleep 2000
if $x == 10 then
return -1
endi
sql show mnodes
print dnode1 ==> $data2_1
print dnode2 ==> $data2_2
if $data2_1 != master then
goto show2
endi
if $data2_2 != slave then
goto show2
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
print ============== step3
system sh/exec.sh -n dnode2 -s start
sleep 10000
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== step4
sql select count(*) from log.dn1
$d1_first = $rows
sql select count(*) from log.dn2
$d2_first = $rows
sleep 3000
sql select count(*) from log.dn1
$d1_second = $rows
sql select count(*) from log.dn2
$d2_second = $rows
print dnode1 $d1_first $d1_second
print dnode2 $d2_first $d2_first
if $d1_first >= $d1_second then
return -1
endi
if $d2_first >= $d2_first then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册