未验证 提交 5e5750bd 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3406 from taosdata/feature/mqtt

Feature/mqtt
......@@ -13,6 +13,8 @@ ENDIF ()
SET(TD_ACCOUNT FALSE)
SET(TD_ADMIN FALSE)
SET(TD_GRANT FALSE)
SET(TD_SYNC TRUE)
SET(TD_MQTT TRUE)
SET(TD_TSDB_PLUGINS FALSE)
SET(TD_COVER FALSE)
......
......@@ -13,6 +13,14 @@ IF (TD_GRANT)
ADD_DEFINITIONS(-D_GRANT)
ENDIF ()
IF (TD_SYNC)
ADD_DEFINITIONS(-D_SYNC)
ENDIF ()
IF (TD_MQTT)
ADD_DEFINITIONS(-D_MQTT)
ENDIF ()
IF (TD_TSDB_PLUGINS)
ADD_DEFINITIONS(-D_TSDB_PLUGINS)
ENDIF ()
......
......@@ -42,6 +42,16 @@ IF (${MEM_CHECK} MATCHES "true")
MESSAGE(STATUS "build with memory check")
ENDIF ()
IF (${MQTT} MATCHES "false")
SET(TD_MQTT FALSE)
MESSAGE(STATUS "build without mqtt module")
ENDIF ()
IF (${SYNC} MATCHES "false")
SET(TD_SYNC FALSE)
MESSAGE(STATUS "build without sync module")
ENDIF ()
IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET(TD_RANDOM_FILE_FAIL TRUE)
MESSAGE(STATUS "build with random-file-fail enabled")
......
......@@ -10,6 +10,6 @@ ADD_SUBDIRECTORY(cJson)
ADD_SUBDIRECTORY(wepoll)
ADD_SUBDIRECTORY(MsvcLibX)
IF (TD_LINUX)
IF (TD_LINUX AND TD_MQTT)
ADD_SUBDIRECTORY(MQTT-C)
ENDIF ()
\ No newline at end of file
cmake_minimum_required(VERSION 3.5)
project(MQTT-C VERSION 1.1.2 LANGUAGES C)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
# MQTT-C build options
option(MQTT_C_OpenSSL_SUPPORT "Build MQTT-C with OpenSSL support?" OFF)
......
......@@ -122,11 +122,14 @@
# number of replications, for cluster only
# replica 1
# mqtt uri
# mqttBrokerAddress mqtt://username:password@hostname:1883/taos/
# mqtt hostname
# mqttHostName test.mosquitto.org
# mqtt client name
# mqttBrokerClientId taos_mqtt
# mqtt port
# mqttPort 1883
# mqtt topic
# mqttTopic /weather/loop
# the compressed rpc message, option:
# -1 (no compression)
......
......@@ -10,7 +10,9 @@ ADD_SUBDIRECTORY(client)
ADD_SUBDIRECTORY(query)
ADD_SUBDIRECTORY(kit)
ADD_SUBDIRECTORY(plugins)
ADD_SUBDIRECTORY(sync)
IF (TD_SYNC)
ADD_SUBDIRECTORY(sync)
ENDIF ()
ADD_SUBDIRECTORY(balance)
ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(vnode)
......
......@@ -104,8 +104,12 @@ extern int32_t tsTelegrafUseFieldNum;
// mqtt
extern int32_t tsEnableMqttModule;
extern char tsMqttBrokerAddress[];
extern char tsMqttBrokerClientId[];
extern char tsMqttHostName[];
extern char tsMqttPort[];
extern char tsMqttUser[];
extern char tsMqttPass[];
extern char tsMqttClientId[];
extern char tsMqttTopic[];
// monitor
extern int32_t tsEnableMonitorModule;
......
......@@ -137,8 +137,12 @@ int32_t tsTelegrafUseFieldNum = 0;
// mqtt
int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default
char tsMqttBrokerAddress[128] = {0};
char tsMqttBrokerClientId[128] = {0};
char tsMqttHostName[TSDB_MQTT_HOSTNAME_LEN] = "test.mosquitto.org";
char tsMqttPort[TSDB_MQTT_PORT_LEN] = "1883";
char tsMqttUser[TSDB_MQTT_USER_LEN] = {0};
char tsMqttPass[TSDB_MQTT_PASS_LEN] = {0};
char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber";
char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/test"; // #
// monitor
int32_t tsEnableMonitorModule = 1;
......@@ -247,8 +251,11 @@ bool taosCfgDynamicOptions(char *msg) {
for (int32_t i = 0; i < tsGlobalConfigNum; ++i) {
SGlobalCfg *cfg = tsGlobalConfig + i;
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
//if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue;
if (cfg->valType != TAOS_CFG_VTYPE_INT32) continue;
int32_t cfgLen = strlen(cfg->option);
if (cfgLen != olen) continue;
if (strncasecmp(option, cfg->option, olen) != 0) continue;
*((int32_t *)cfg->ptr) = vint;
......@@ -767,26 +774,36 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttBrokerAddress";
cfg.ptr = tsMqttBrokerAddress;
cfg.option = "mqttHostName";
cfg.ptr = tsMqttHostName;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 126;
cfg.ptrLength = TSDB_MQTT_HOSTNAME_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttBrokerClientId";
cfg.ptr = tsMqttBrokerClientId;
cfg.option = "mqttPort";
cfg.ptr = tsMqttPort;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = 126;
cfg.ptrLength = TSDB_MQTT_PORT_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttTopic";
cfg.ptr = tsMqttTopic;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_MQTT_TOPIC_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "compressMsgSize";
cfg.ptr = &tsCompressMsgSize;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......@@ -1270,6 +1287,9 @@ void taosInitGlobalCfg() {
}
bool taosCheckGlobalCfg() {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) {
taosSetAllDebugFlag();
}
......@@ -1283,12 +1303,18 @@ bool taosCheckGlobalCfg() {
if (tsFirst[0] == 0) {
strcpy(tsFirst, tsLocalEp);
} else {
taosGetFqdnPortFromEp(tsFirst, fqdn, &port);
snprintf(tsFirst, sizeof(tsFirst), "%s:%d", fqdn, port);
}
if (tsSecond[0] == 0) {
strcpy(tsSecond, tsLocalEp);
} else {
taosGetFqdnPortFromEp(tsSecond, fqdn, &port);
snprintf(tsSecond, sizeof(tsSecond), "%s:%d", fqdn, port);
}
taosGetSystemInfo();
tsSetLocale();
......
......@@ -11,10 +11,12 @@ AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4)
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4 balance sync)
TARGET_LINK_LIBRARIES(taosd taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(taosd mnode taos monitor http mqtt tsdb twal vnode cJson lz4 balance sync)
TARGET_LINK_LIBRARIES(taosd taos)
ENDIF ()
IF (TD_ACCOUNT)
......@@ -25,6 +27,14 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES(taosd grant)
ENDIF ()
IF (TD_MQTT)
TARGET_LINK_LIBRARIES(taosd mqtt)
ENDIF ()
IF (TD_SYNC)
TARGET_LINK_LIBRARIES(taosd balance sync)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target")
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
......
......@@ -611,7 +611,7 @@ static bool dnodeReadMnodeInfos() {
}
for (int i = 0; i < size; ++i) {
cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
......@@ -627,7 +627,7 @@ static bool dnodeReadMnodeInfos() {
goto PARSE_OVER;
}
strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
}
}
ret = true;
......
......@@ -62,6 +62,7 @@ static void dnodeAllocModules() {
dnodeSetModuleStatus(TSDB_MOD_HTTP);
}
#ifdef _MQTT
tsModule[TSDB_MOD_MQTT].enable = (tsEnableMqttModule == 1);
tsModule[TSDB_MOD_MQTT].name = "mqtt";
tsModule[TSDB_MOD_MQTT].initFp = mqttInitSystem;
......@@ -71,6 +72,7 @@ static void dnodeAllocModules() {
if (tsEnableMqttModule) {
dnodeSetModuleStatus(TSDB_MOD_MQTT);
}
#endif
tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1);
tsModule[TSDB_MOD_MONITOR].name = "monitor";
......
......@@ -272,6 +272,13 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SHOW_SQL_LEN 64
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_USER_LEN 24
#define TSDB_MQTT_PASS_LEN 24
#define TSDB_MQTT_TOPIC_LEN 64
#define TSDB_MQTT_CLIENT_ID_LEN 32
#define TSDB_METER_STATE_OFFLINE 0
#define TSDB_METER_STATE_ONLLINE 1
......
......@@ -19,11 +19,11 @@
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
......
......@@ -310,6 +310,13 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
#ifndef _SYNC
if (pCfg->replications != 1) {
mError("invalid db option replications:%d can only be 1 in this version", pCfg->replications);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
#endif
return TSDB_CODE_SUCCESS;
}
......
......@@ -78,6 +78,9 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj;
#ifndef _SYNC
mnodeDropAllDnodeVgroups(pDnode);
#endif
mnodeDropMnodeLocal(pDnode->dnodeId);
balanceAsyncNotify();
......@@ -585,7 +588,11 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
mInfo("dnode:%d, start to drop it", pDnode->dnodeId);
#ifndef _SYNC
int32_t code = mnodeDropDnode(pDnode, pMsg);
#else
int32_t code = balanceDropDnode(pDnode);
#endif
mnodeDecDnodeRef(pDnode);
return code;
}
......@@ -1043,3 +1050,59 @@ static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) {
}
}
#ifndef _SYNC
int32_t balanceInit() { return TSDB_CODE_SUCCESS; }
void balanceCleanUp() {}
void balanceAsyncNotify() {}
void balanceSyncNotify() {}
void balanceReset() {}
int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; }
char* syncRole[] = {
"offline",
"unsynced",
"syncing",
"slave",
"master"
};
int32_t balanceAllocVnodes(SVgObj *pVgroup) {
void * pIter = NULL;
SDnodeObj *pDnode = NULL;
SDnodeObj *pSelDnode = NULL;
float vnodeUsage = 1000.0;
while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break;
if (pDnode->numOfCores > 0 && pDnode->openVnodes < TSDB_MAX_VNODES) {
float openVnodes = pDnode->openVnodes;
if (pDnode->isMgmt) openVnodes += tsMnodeEqualVnodeNum;
float usage = openVnodes / pDnode->numOfCores;
if (usage <= vnodeUsage) {
pSelDnode = pDnode;
vnodeUsage = usage;
}
}
mnodeDecDnodeRef(pDnode);
}
sdbFreeIter(pIter);
if (pSelDnode == NULL) {
mError("failed to alloc vnode to vgroup");
return TSDB_CODE_MND_NO_ENOUGH_DNODES;
}
pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId;
pVgroup->vnodeGid[0].pDnode = pSelDnode;
mDebug("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
return TSDB_CODE_SUCCESS;
}
#endif
......@@ -98,8 +98,10 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
.connId = connId,
.stime = taosGetTimestampMs()
};
tstrncpy(connObj.user, user, sizeof(connObj.user));
connObj.lastAccess = connObj.stime;
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000);
mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
......@@ -244,6 +246,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pConnObj->lastAccess < pConnObj->stime) pConnObj->lastAccess = pConnObj->stime;
*(int64_t *)pWrite = pConnObj->lastAccess;
cols++;
......
......@@ -1711,14 +1711,20 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
mnodeDestroyChildTable(pTable);
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
pTable->suid = pMsg->pSTable->uid;
pTable->uid = (((uint64_t)pTable->vgId) << 40) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
(sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) +
((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
pTable->superTable = pMsg->pSTable;
} else {
pTable->uid = (((uint64_t)pTable->vgId) << 40) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
(sdbGetVersion() & ((1ul << 16) - 1ul));
if (pTable->info.type == TSDB_SUPER_TABLE) {
int64_t us = taosGetTimestampUs();
pTable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
} else {
pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) +
((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
}
pTable->sversion = 0;
pTable->numOfColumns = htons(pCreate->numOfColumns);
pTable->sqlLen = htons(pCreate->sqlLen);
......
......@@ -581,7 +581,7 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (!sdbIsMaster()) {
*secret = 0;
mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_APP_NOT_READY));
mDebug("user:%s, failed to auth user, mnode is not master", user);
return TSDB_CODE_APP_NOT_READY;
}
......
......@@ -3,4 +3,6 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(monitor)
ADD_SUBDIRECTORY(http)
ADD_SUBDIRECTORY(mqtt)
IF (TD_MQTT)
ADD_SUBDIRECTORY(mqtt)
ENDIF ()
\ No newline at end of file
......@@ -11,11 +11,12 @@ AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
ADD_LIBRARY(http ${SRC})
TARGET_LINK_LIBRARIES(http z)
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(http taos_static z)
TARGET_LINK_LIBRARIES(http taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(http taos z)
TARGET_LINK_LIBRARIES(http taos)
ENDIF ()
IF (TD_ADMIN)
......
......@@ -2,11 +2,11 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(./src SRC)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
ADD_LIBRARY(monitor ${SRC})
IF (TD_SOMODE_STATIC)
......
......@@ -25,6 +25,7 @@
#include "tsclient.h"
#include "dnode.h"
#include "monitor.h"
#include "taoserror.h"
#define monitorFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
#define monitorError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
......@@ -33,129 +34,159 @@
#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1024
#define SQL_LENGTH 1030
#define LOG_LEN_STR 100
#define IP_LEN_STR TSDB_EP_LEN
#define CHECK_INTERVAL 1000
typedef enum {
MONITOR_CMD_CREATE_DB,
MONITOR_CMD_CREATE_TB_LOG,
MONITOR_CMD_CREATE_MT_DN,
MONITOR_CMD_CREATE_MT_ACCT,
MONITOR_CMD_CREATE_TB_DN,
MONITOR_CMD_CREATE_TB_ACCT_ROOT,
MONITOR_CMD_CREATE_TB_SLOWQUERY,
MONITOR_CMD_MAX
MON_CMD_CREATE_DB,
MON_CMD_CREATE_TB_LOG,
MON_CMD_CREATE_MT_DN,
MON_CMD_CREATE_MT_ACCT,
MON_CMD_CREATE_TB_DN,
MON_CMD_CREATE_TB_ACCT_ROOT,
MON_CMD_CREATE_TB_SLOWQUERY,
MON_CMD_MAX
} EMonitorCommand;
typedef enum {
MONITOR_STATE_UN_INIT,
MONITOR_STATE_INITIALIZING,
MONITOR_STATE_INITIALIZED,
MONITOR_STATE_STOPPED
MON_STATE_NOT_INIT,
MON_STATE_INITED
} EMonitorState;
typedef struct {
void * conn;
void * timer;
char ep[TSDB_EP_LEN];
int8_t cmdIndex;
int8_t state;
char sql[SQL_LENGTH + 1];
void * initTimer;
void * diskTimer;
pthread_t thread;
void * conn;
char ep[TSDB_EP_LEN];
int8_t cmdIndex;
int8_t state;
int8_t start; // enable/disable by mnode
int8_t quiting; // taosd is quiting
char sql[SQL_LENGTH + 1];
} SMonitorConn;
static SMonitorConn tsMonitorConn;
static void monitorInitConn(void *para, void *unused);
static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code);
static void monitorInitDatabase();
static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code);
static void monitorStartTimer();
static void monitorSaveSystemInfo();
static SMonitorConn tsMonitor = {0};
static void monitorSaveSystemInfo();
static void *monitorThreadFunc(void *param);
static void monitorBuildMonitorSql(char *sql, int32_t cmd);
extern int32_t (*monitorStartSystemFp)();
extern void (*monitorStopSystemFp)();
extern void (*monitorExecuteSQLFp)(char *sql);
static void monitorCheckDiskUsage(void *para, void *unused) {
taosGetDisk();
taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer);
}
extern void (*monitorStopSystemFp)();
extern void (*monitorExecuteSQLFp)(char *sql);
int32_t monitorInitSystem() {
taos_init();
taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer);
if (tsMonitor.ep[0] == 0) {
strcpy(tsMonitor.ep, tsLocalEp);
}
int len = strlen(tsMonitor.ep);
for (int i = 0; i < len; ++i) {
if (tsMonitor.ep[i] == ':' || tsMonitor.ep[i] == '-' || tsMonitor.ep[i] == '.') {
tsMonitor.ep[i] = '_';
}
}
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) {
monitorError("failed to create thread to for monitor module, reason:%s", strerror(errno));
return -1;
}
pthread_attr_destroy(&thAttr);
monitorDebug("monitor thread is launched");
monitorStartSystemFp = monitorStartSystem;
monitorStopSystemFp = monitorStopSystem;
return 0;
}
int32_t monitorStartSystem() {
monitorInfo("start monitor module");
monitorInitSystem();
taosTmrReset(monitorInitConn, 10, NULL, tscTmr, &tsMonitorConn.initTimer);
taos_init();
tsMonitor.start = 1;
monitorExecuteSQLFp = monitorExecuteSQL;
monitorInfo("monitor module start");
return 0;
}
static void monitorStartSystemRetry() {
if (tsMonitorConn.initTimer != NULL) {
taosTmrReset(monitorInitConn, 3000, NULL, tscTmr, &tsMonitorConn.initTimer);
}
}
static void *monitorThreadFunc(void *param) {
monitorDebug("starting to initialize monitor module ...");
static void monitorInitConn(void *para, void *unused) {
if (dnodeGetDnodeId() <= 0) {
monitorStartSystemRetry();
return;
}
monitorInfo("starting to initialize monitor service ..");
tsMonitorConn.state = MONITOR_STATE_INITIALIZING;
while (1) {
if (tsMonitor.quiting) {
tsMonitor.state = MON_STATE_NOT_INIT;
monitorInfo("monitor thread will quit, for taosd is quiting");
break;
} else {
taosGetDisk();
}
if (tsMonitorConn.ep[0] == 0)
strcpy(tsMonitorConn.ep, tsLocalEp);
if (tsMonitor.start == 0) {
continue;
}
int len = strlen(tsMonitorConn.ep);
for (int i = 0; i < len; ++i) {
if (tsMonitorConn.ep[i] == ':' || tsMonitorConn.ep[i] == '-') {
tsMonitorConn.ep[i] = '_';
static int32_t accessTimes = 0;
accessTimes++;
taosMsleep(1000);
if (dnodeGetDnodeId() <= 0) {
monitorDebug("dnode not initialized, waiting for 3000 ms to start monitor module");
continue;
}
}
if (tsMonitorConn.conn == NULL) {
taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, monitorInitConnCb, &tsMonitorConn, &(tsMonitorConn.conn));
} else {
monitorInitDatabase();
}
}
if (tsMonitor.conn == NULL) {
tsMonitor.state = MON_STATE_NOT_INIT;
tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0);
if (tsMonitor.conn == NULL) {
monitorError("failed to connect to database, reason:%s", tstrerror(terrno));
continue;
} else {
monitorDebug("connect to database success");
}
}
static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code) {
// free it firstly in any cases.
taos_free_result(result);
if (tsMonitor.state == MON_STATE_NOT_INIT) {
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
monitorError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code));
break;
} else {
monitorDebug("successfully to exec sql:%s", tsMonitor.sql);
}
}
if (tsMonitor.start) {
tsMonitor.state = MON_STATE_INITED;
}
}
if (code != TSDB_CODE_SUCCESS) {
monitorError("monitor:%p, connect to database failed, reason:%s", tsMonitorConn.conn, tstrerror(code));
taos_close(tsMonitorConn.conn);
tsMonitorConn.conn = NULL;
tsMonitorConn.state = MONITOR_STATE_UN_INIT;
monitorStartSystemRetry();
return;
if (tsMonitor.state == MON_STATE_INITED) {
if (accessTimes % tsMonitorInterval == 0) {
monitorSaveSystemInfo();
}
}
}
monitorDebug("monitor:%p, connect to database success, reason:%s", tsMonitorConn.conn, tstrerror(code));
monitorInitDatabase();
monitorInfo("monitor thread is stopped");
return NULL;
}
static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
static void monitorBuildMonitorSql(char *sql, int32_t cmd) {
memset(sql, 0, SQL_LENGTH);
if (cmd == MONITOR_CMD_CREATE_DB) {
if (cmd == MON_CMD_CREATE_DB) {
snprintf(sql, SQL_LENGTH,
"create database if not exists %s replica 1 days 10 keep 30 cache %d "
"blocks %d maxtables 16 precision 'us'",
"blocks %d precision 'us'",
tsMonitorDbName, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MIN_TOTAL_BLOCKS);
} else if (cmd == MONITOR_CMD_CREATE_MT_DN) {
} else if (cmd == MON_CMD_CREATE_MT_DN) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.dn(ts timestamp"
", cpu_taosd float, cpu_system float, cpu_cores int"
......@@ -166,10 +197,10 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
", req_http int, req_select int, req_insert int"
") tags (dnodeid int, fqdn binary(%d))",
tsMonitorDbName, TSDB_FQDN_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_DN) {
} else if (cmd == MON_CMD_CREATE_TB_DN) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn%d using %s.dn tags(%d, '%s')", tsMonitorDbName,
dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp);
} else if (cmd == MONITOR_CMD_CREATE_MT_ACCT) {
} else if (cmd == MON_CMD_CREATE_MT_ACCT) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.acct(ts timestamp "
", currentPointsPerSecond bigint, maxPointsPerSecond bigint"
......@@ -185,15 +216,15 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
", accessState smallint"
") tags (acctId binary(%d))",
tsMonitorDbName, TSDB_USER_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_ACCT_ROOT) {
} else if (cmd == MON_CMD_CREATE_TB_ACCT_ROOT) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.acct_%s using %s.acct tags('%s')", tsMonitorDbName, TSDB_DEFAULT_USER,
tsMonitorDbName, TSDB_DEFAULT_USER);
} else if (cmd == MONITOR_CMD_CREATE_TB_SLOWQUERY) {
} else if (cmd == MON_CMD_CREATE_TB_SLOWQUERY) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.slowquery(ts timestamp, username "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))",
tsMonitorDbName, TSDB_TABLE_FNAME_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_LOG) {
} else if (cmd == MON_CMD_CREATE_TB_LOG) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.log(ts timestamp, level tinyint, "
"content binary(%d), ipaddr binary(%d))",
......@@ -203,75 +234,22 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
sql[SQL_LENGTH] = 0;
}
static void monitorInitDatabase() {
if (tsMonitorConn.cmdIndex < MONITOR_CMD_MAX) {
dnodeBuildMonitorSql(tsMonitorConn.sql, tsMonitorConn.cmdIndex);
taos_query_a(tsMonitorConn.conn, tsMonitorConn.sql, monitorInitDatabaseCb, NULL);
} else {
tsMonitorConn.state = MONITOR_STATE_INITIALIZED;
monitorExecuteSQLFp = monitorExecuteSQL;
monitorInfo("monitor service init success");
monitorStartTimer();
}
}
static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code) {
if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST || code == TSDB_CODE_MND_DB_ALREADY_EXIST || code >= 0) {
monitorDebug("monitor:%p, sql success, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql);
if (tsMonitorConn.cmdIndex == MONITOR_CMD_CREATE_TB_LOG) {
monitorInfo("dnode:%s is started", tsLocalEp);
}
tsMonitorConn.cmdIndex++;
monitorInitDatabase();
} else {
monitorError("monitor:%p, sql failed, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql);
tsMonitorConn.state = MONITOR_STATE_UN_INIT;
monitorStartSystemRetry();
}
taos_free_result(result);
}
void monitorStopSystem() {
if (tsMonitorConn.state == MONITOR_STATE_STOPPED) return;
tsMonitorConn.state = MONITOR_STATE_STOPPED;
tsMonitor.start = 0;
tsMonitor.state = MON_STATE_NOT_INIT;
monitorExecuteSQLFp = NULL;
monitorInfo("monitor module is stopped");
if (tsMonitorConn.initTimer != NULL) {
taosTmrStopA(&(tsMonitorConn.initTimer));
}
if (tsMonitorConn.timer != NULL) {
taosTmrStopA(&(tsMonitorConn.timer));
}
if (tsMonitorConn.conn != NULL) {
taos_close(tsMonitorConn.conn);
tsMonitorConn.conn = NULL;
}
monitorInfo("monitor module stopped");
}
void monitorCleanUpSystem() {
tsMonitor.quiting = 1;
monitorStopSystem();
monitorInfo("monitor module cleanup");
}
static void monitorStartTimer() {
taosTmrReset(monitorSaveSystemInfo, tsMonitorInterval * 1000, NULL, tscTmr, &tsMonitorConn.timer);
}
static void dnodeMontiorLogCallback(void *param, TAOS_RES *result, int32_t code) {
int32_t c = taos_errno(result);
if (c != TSDB_CODE_SUCCESS) {
monitorError("monitor:%p, save %s failed, reason:%s", tsMonitorConn.conn, (char *)param, tstrerror(c));
} else {
int32_t rows = taos_affected_rows(result);
monitorDebug("monitor:%p, save %s succ, rows:%d", tsMonitorConn.conn, (char *)param, rows);
pthread_join(tsMonitor.thread, NULL);
if (tsMonitor.conn != NULL) {
taos_close(tsMonitor.conn);
tsMonitor.conn = NULL;
}
taos_free_result(result);
monitorInfo("monitor module is cleaned up");
}
// unit is MB
......@@ -279,13 +257,13 @@ static int32_t monitorBuildMemorySql(char *sql) {
float sysMemoryUsedMB = 0;
bool suc = taosGetSysMemory(&sysMemoryUsedMB);
if (!suc) {
monitorError("monitor:%p, get sys memory info failed.", tsMonitorConn.conn);
monitorDebug("failed to get sys memory info");
}
float procMemoryUsedMB = 0;
suc = taosGetProcMemory(&procMemoryUsedMB);
if (!suc) {
monitorError("monitor:%p, get proc memory info failed.", tsMonitorConn.conn);
monitorDebug("failed to get proc memory info");
}
return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB);
......@@ -296,11 +274,11 @@ static int32_t monitorBuildCpuSql(char *sql) {
float sysCpuUsage = 0, procCpuUsage = 0;
bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
if (!suc) {
monitorError("monitor:%p, get cpu usage failed.", tsMonitorConn.conn);
monitorDebug("failed to get cpu usage");
}
if (sysCpuUsage <= procCpuUsage) {
sysCpuUsage = procCpuUsage + (float)0.1;
sysCpuUsage = procCpuUsage + 0.1f;
}
return sprintf(sql, ", %f, %f, %d", procCpuUsage, sysCpuUsage, tsNumOfCores);
......@@ -316,14 +294,14 @@ static int32_t monitorBuildBandSql(char *sql) {
float bandSpeedKb = 0;
bool suc = taosGetBandSpeed(&bandSpeedKb);
if (!suc) {
monitorError("monitor:%p, get bandwidth speed failed.", tsMonitorConn.conn);
monitorDebug("failed to get bandwidth speed");
}
return sprintf(sql, ", %f", bandSpeedKb);
}
static int32_t monitorBuildReqSql(char *sql) {
SDnodeStatisInfo info = dnodeGetStatisInfo();
SDnodeStatisInfo info = dnodeGetStatisInfo();
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum);
}
......@@ -331,20 +309,15 @@ static int32_t monitorBuildIoSql(char *sql) {
float readKB = 0, writeKB = 0;
bool suc = taosGetProcIO(&readKB, &writeKB);
if (!suc) {
monitorError("monitor:%p, get io info failed.", tsMonitorConn.conn);
monitorDebug("failed to get io info");
}
return sprintf(sql, ", %f, %f", readKB, writeKB);
}
static void monitorSaveSystemInfo() {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) {
monitorStartTimer();
return;
}
int64_t ts = taosGetTimestampUs();
char * sql = tsMonitorConn.sql;
char * sql = tsMonitor.sql;
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts);
pos += monitorBuildCpuSql(sql + pos);
......@@ -354,16 +327,31 @@ static void monitorSaveSystemInfo() {
pos += monitorBuildIoSql(sql + pos);
pos += monitorBuildReqSql(sql + pos);
monitorDebug("monitor:%p, save system info, sql:%s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sys");
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
monitorError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql);
} else {
monitorDebug("successfully to save system info, sql:%s", tsMonitor.sql);
}
}
if (tsMonitorConn.timer != NULL && tsMonitorConn.state != MONITOR_STATE_STOPPED) {
monitorStartTimer();
static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
int32_t c = taos_errno(result);
if (c != TSDB_CODE_SUCCESS) {
monitorError("save %s failed, reason:%s", (char *)param, tstrerror(c));
} else {
int32_t rows = taos_affected_rows(result);
monitorDebug("save %s succ, rows:%d", (char *)param, rows);
}
taos_free_result(result);
}
void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
if (tsMonitor.state != MON_STATE_INITED) return;
char sql[1024] = {0};
sprintf(sql,
......@@ -392,19 +380,16 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
pMon->totalConns, pMon->maxConns,
pMon->accessState);
monitorDebug("monitor:%p, save account info, sql %s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "account");
monitorDebug("save account info, sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info");
}
void monitorSaveLog(int32_t level, const char *const format, ...) {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
if (tsMonitor.state != MON_STATE_INITED) return;
va_list argpointer;
char sql[SQL_LENGTH] = {0};
int32_t max_length = SQL_LENGTH - 30;
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
int32_t len = snprintf(sql, (size_t)max_length, "insert into %s.log values(%" PRId64 ", %d,'", tsMonitorDbName,
taosGetTimestampUs(), level);
......@@ -416,12 +401,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) {
len += sprintf(sql + len, "', '%s')", tsLocalEp);
sql[len++] = 0;
monitorDebug("monitor:%p, save log, sql: %s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "log");
monitorDebug("save log, sql: %s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log");
}
void monitorExecuteSQL(char *sql) {
if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return;
monitorDebug("monitor:%p, execute sql: %s", tsMonitorConn.conn, sql);
taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sql");
if (tsMonitor.state != MON_STATE_INITED) return;
monitorDebug("execute sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql");
}
......@@ -2,21 +2,19 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/include)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/examples/templates)
AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/include)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/examples/templates)
ADD_LIBRARY(mqtt ${SRC})
TARGET_LINK_LIBRARIES(mqtt cJson mqttc)
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(mqtt taos_static cJson mqttc)
TARGET_LINK_LIBRARIES(mqtt taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(mqtt taos cJson mqttc)
ENDIF ()
IF (TD_ADMIN)
TARGET_LINK_LIBRARIES(mqtt admin cJson)
TARGET_LINK_LIBRARIES(mqtt taos)
ENDIF ()
ENDIF ()
......@@ -23,11 +23,12 @@ extern "C" {
* @file
* A simple subscriber program that performs automatic reconnections.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mqtt.h"
#include "taos.h"
#define QOS 1
#define TIMEOUT 10000L
#define MQTT_SEND_BUF_SIZE 102400
#define MQTT_RECV_BUF_SIZE 102400
/**
* @brief A structure that I will use to keep track of some data needed
......@@ -36,18 +37,12 @@ extern "C" {
* An instance of this struct will be created in my \c main(). Then, whenever
* \ref mqttReconnectClient is called, this instance will be passed.
*/
struct reconnect_state_t {
char* hostname;
char* port;
char* topic;
char* client_id;
char* user_name;
char* password;
typedef struct SMqttReconnectState {
uint8_t* sendbuf;
size_t sendbufsz;
uint8_t* recvbuf;
size_t recvbufsz;
};
} SMqttReconnectState;
/**
* @brief My reconnect callback. It will reestablish the connection whenever
......@@ -58,7 +53,7 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr
/**
* @brief The function will be called whenever a PUBLISH message is received.
*/
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published);
void mqttPublishCallback(void** unused, struct mqtt_response_publish* published);
/**
* @brief The client's refresher. This function triggers back-end routines to
......@@ -73,12 +68,7 @@ void* mqttClientRefresher(void* client);
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon);
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code);
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code);
#define QOS 1
#define TIMEOUT 10000L
void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon);
#ifdef __cplusplus
}
......
......@@ -15,11 +15,13 @@
#ifndef TDENGINE_MQTT_PLYLOAD_H
#define TDENGINE_MQTT_PLYLOAD_H
#ifdef __cplusplus
extern "C" {
#endif
char split(char str[], char delims[], char** p_p_cmd_part, int max);
char* converJsonToSql(char* json, char* _dbname, char* _tablename);
char* mqttConverJsonToSql(char* json, int maxSize);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_SYSTEM_H
#define TDENGINE_MQTT_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t mqttInitSystem();
int32_t mqttStartSystem();
void mqttStopSystem();
void mqttCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
......@@ -14,52 +14,146 @@
*/
#define _DEFAULT_SOURCE
#include "mqttPayload.h"
#include "os.h"
#include "cJSON.h"
#include "string.h"
#include "taos.h"
#include "mqttLog.h"
#include "os.h"
char split(char str[], char delims[], char** p_p_cmd_part, int max) {
char* token = strtok(str, delims);
char part_index = 0;
char** tmp_part = p_p_cmd_part;
while (token) {
*tmp_part++ = token;
token = strtok(NULL, delims);
part_index++;
if (part_index >= max) break;
}
return part_index;
#include "mqttPayload.h"
// subscribe message like this
/*
/test {
"timestamp": 1599121290,
"gateway": {
"name": "AcuLink 810 Gateway",
"model": "AcuLink810-868",
"serial": "S8P20200207"
},
"device": {
"name": "Acuvim L V3 .221",
"model": "Acuvim-L-V3",
"serial": "221",
"online": true,
"readings": [
{
"param": "Freq_Hz",
"value": "59.977539",
"unit": "Hz"
},
{
"param": "Va_V",
"value": "122.002907",
"unit": "V"
},
{
"param": "DI4",
"value": "5.000000",
"unit": ""
}
]
}
}
*/
// send msg cmd
// mosquitto_pub -h test.mosquitto.org -t "/test" -m '{"timestamp": 1599121290,"gateway": {"name": "AcuLink 810 Gateway","model": "AcuLink810-868","serial": "S8P20200207"},"device": {"name": "Acuvim L V3 .221","model": "Acuvim-L-V3","serial": "221","online": true,"readings": [{"param": "Freq_Hz","value": "59.977539","unit": "Hz"},{"param": "Va_V","value": "122.002907","unit": "V"},{"param": "DI4","value": "5.000000","unit": ""}]}}'
/*
* This is an example, this function needs to be implemented in order to parse the json file into a sql statement
* Note that you need to create a super table and database before writing data
* In this case:
* create database mqttdb;
* create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16));
*/
char* mqttConverJsonToSql(char* json, int maxSize) {
// const int32_t maxSize = 10240;
maxSize *= 5;
char* sql = malloc(maxSize);
cJSON* root = cJSON_Parse(json);
if (root == NULL) {
mqttError("failed to parse msg, invalid json format");
goto MQTT_PARSE_OVER;
}
cJSON* timestamp = cJSON_GetObjectItem(root, "timestamp");
if (!timestamp || timestamp->type != cJSON_Number) {
mqttError("failed to parse msg, timestamp not found");
goto MQTT_PARSE_OVER;
}
cJSON* device = cJSON_GetObjectItem(root, "device");
if (!device) {
mqttError("failed to parse msg, device not found");
goto MQTT_PARSE_OVER;
}
char* converJsonToSql(char* json, char* _dbname, char* _tablename) {
cJSON* jPlayload = cJSON_Parse(json);
char _names[102400] = {0};
char _values[102400] = {0};
int i = 0;
int count = cJSON_GetArraySize(jPlayload);
for (; i < count; i++)
{
cJSON* item = cJSON_GetArrayItem(jPlayload, i);
if (cJSON_Object == item->type) {
mqttInfo("The item '%s' is not supported", item->string);
} else {
strcat(_names, item->string);
if (i < count - 1) {
strcat(_names, ",");
}
char* __value_json = cJSON_Print(item);
strcat(_values, __value_json);
free(__value_json);
if (i < count - 1) {
strcat(_values, ",");
}
cJSON* name = cJSON_GetObjectItem(device, "name");
if (!name || name->type != cJSON_String) {
mqttError("failed to parse msg, name not found");
goto MQTT_PARSE_OVER;
}
cJSON* model = cJSON_GetObjectItem(device, "model");
if (!model || model->type != cJSON_String) {
mqttError("failed to parse msg, model not found");
goto MQTT_PARSE_OVER;
}
cJSON* serial = cJSON_GetObjectItem(device, "serial");
if (!serial || serial->type != cJSON_String) {
mqttError("failed to parse msg, serial not found");
goto MQTT_PARSE_OVER;
}
cJSON* readings = cJSON_GetObjectItem(device, "readings");
if (!readings || readings->type != cJSON_Array) {
mqttError("failed to parse msg, readings not found");
goto MQTT_PARSE_OVER;
}
int count = cJSON_GetArraySize(readings);
if (count <= 0) {
mqttError("failed to parse msg, readings size smaller than 0");
goto MQTT_PARSE_OVER;
}
int len = snprintf(sql, maxSize, "insert into");
for (int i = 0; i < count; ++i) {
cJSON* reading = cJSON_GetArrayItem(readings, i);
if (reading == NULL) continue;
cJSON* param = cJSON_GetObjectItem(reading, "param");
if (!param || param->type != cJSON_String) {
mqttError("failed to parse msg, param not found");
goto MQTT_PARSE_OVER;
}
cJSON* value = cJSON_GetObjectItem(reading, "value");
if (!value || value->type != cJSON_String) {
mqttError("failed to parse msg, value not found");
goto MQTT_PARSE_OVER;
}
cJSON* unit = cJSON_GetObjectItem(reading, "unit");
if (!unit || unit->type != cJSON_String) {
mqttError("failed to parse msg, unit not found");
goto MQTT_PARSE_OVER;
}
len += snprintf(sql + len, maxSize - len,
" mqttdb.serial_%s_%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)",
serial->valuestring, param->valuestring, name->valuestring, model->valuestring, serial->valuestring,
param->valuestring, unit->valuestring, timestamp->valueint * 1000, value->valuestring);
}
cJSON_free(jPlayload);
int sqllen = strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024;
char* _sql = calloc(1, sqllen);
sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values);
return _sql;
cJSON_free(root);
return sql;
MQTT_PARSE_OVER:
cJSON_free(root);
free(sql);
return NULL;
}
\ No newline at end of file
......@@ -14,244 +14,131 @@
*/
#define _DEFAULT_SOURCE
#include "cJSON.h"
#include "os.h"
#include "mqtt.h"
#include "mqttInit.h"
#include "mqttLog.h"
#include "mqttPayload.h"
#include "os.h"
#include "tmqtt.h"
#include "posix_sockets.h"
#include "string.h"
#include "taos.h"
#include "tglobal.h"
#include "tmqtt.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"
#include "mqttSystem.h"
struct mqtt_client mqttClient = {0};
pthread_t clientDaemonThread = {0};
void* mqttConnect=NULL;
struct reconnect_state_t recntStatus = {0};
char* topicPath=NULL;
int mttIsRuning = 1;
#include "taoserror.h"
int32_t mqttInitSystem() {
int rc = 0;
#if 0
uint8_t sendbuf[2048];
uint8_t recvbuf[1024];
recntStatus.sendbuf = sendbuf;
recntStatus.sendbufsz = sizeof(sendbuf);
recntStatus.recvbuf = recvbuf;
recntStatus.recvbufsz = sizeof(recvbuf);
char* url = tsMqttBrokerAddress;
recntStatus.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL;
char * passStr = strstr(url, recntStatus.user_name);
if (passStr != NULL) {
recntStatus.password = strstr(url, "@") != NULL ? strbetween(passStr, ":", "@") : NULL;
}
struct SMqttReconnectState tsMqttStatus = {0};
struct mqtt_client tsMqttClient = {0};
static pthread_t tsMqttClientDaemonThread = {0};
static void* tsMqttConnect = NULL;
static bool tsMqttIsRuning = false;
if (strlen(url) == 0) {
mqttDebug("mqtt module not init, url is null");
return rc;
}
if (strstr(url, "@") != NULL) {
recntStatus.hostname = strbetween(url, "@", ":");
} else if (strstr(strstr(url, "://") + 3, ":") != NULL) {
recntStatus.hostname = strbetween(url, "//", ":");
} else {
recntStatus.hostname = strbetween(url, "//", "/");
}
int32_t mqttInitSystem() { return 0; }
char* _begin_hostname = strstr(url, recntStatus.hostname);
if (_begin_hostname != NULL && strstr(_begin_hostname, ":") != NULL) {
recntStatus.port = strbetween(_begin_hostname, ":", "/");
} else {
recntStatus.port = strbetween("'1883'", "'", "'");
}
int32_t mqttStartSystem() {
tsMqttStatus.sendbufsz = MQTT_SEND_BUF_SIZE;
tsMqttStatus.recvbufsz = MQTT_RECV_BUF_SIZE;
tsMqttStatus.sendbuf = malloc(MQTT_SEND_BUF_SIZE);
tsMqttStatus.recvbuf = malloc(MQTT_RECV_BUF_SIZE);
tsMqttIsRuning = true;
char* portStr = recntStatus.hostname;
if (_begin_hostname != NULL) {
char* colonStr = strstr(_begin_hostname, ":");
if (colonStr != NULL) {
portStr = recntStatus.port;
}
mqtt_init_reconnect(&tsMqttClient, mqttReconnectClient, &tsMqttStatus, mqttPublishCallback);
if (pthread_create(&tsMqttClientDaemonThread, NULL, mqttClientRefresher, &tsMqttClient)) {
mqttError("mqtt failed to start daemon.");
mqttCleanupRes(EXIT_FAILURE, -1, NULL);
return -1;
}
char* topicStr = strstr(url, portStr);
if (topicStr != NULL) {
topicPath = strbetween(topicStr, "/", "/");
char* _topic = "+/+/+/";
int _tpsize = strlen(topicPath) + strlen(_topic) + 1;
recntStatus.topic = calloc(1, _tpsize);
sprintf(recntStatus.topic, "/%s/%s", topicPath, _topic);
recntStatus.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt";
mqttConnect = NULL;
} else {
topicPath = NULL;
}
#endif
return rc;
mqttInfo("mqtt listening for topic:%s messages", tsMqttTopic);
return 0;
}
int32_t mqttStartSystem() {
int rc = 0;
#if 0
if (recntStatus.user_name != NULL && recntStatus.password != NULL) {
mqttInfo("connecting to mqtt://%s:%s@%s:%s/%s/", recntStatus.user_name, recntStatus.password,
recntStatus.hostname, recntStatus.port, topicPath);
} else if (recntStatus.user_name != NULL && recntStatus.password == NULL) {
mqttInfo("connecting to mqtt://%s@%s:%s/%s/", recntStatus.user_name, recntStatus.hostname, recntStatus.port,
topicPath);
}
void mqttStopSystem() {
if (tsMqttIsRuning) {
tsMqttIsRuning = false;
tsMqttClient.error = MQTT_ERROR_SOCKET_ERROR;
mqtt_init_reconnect(&mqttClient, mqttReconnectClient, &recntStatus, mqtt_PublishCallback);
if (pthread_create(&clientDaemonThread, NULL, mqttClientRefresher, &mqttClient)) {
mqttError("Failed to start client daemon.");
mqttCleanup(EXIT_FAILURE, -1, NULL);
rc = -1;
} else {
mqttInfo("listening for '%s' messages.", recntStatus.topic);
}
#endif
return rc;
}
taosMsleep(300);
mqttCleanupRes(EXIT_SUCCESS, tsMqttClient.socketfd, &tsMqttClientDaemonThread);
void mqttStopSystem() {
#if 0
mqttClient.error = MQTT_ERROR_SOCKET_ERROR;
mttIsRuning = 0;
usleep(300000U);
mqttCleanup(EXIT_SUCCESS, mqttClient.socketfd, &clientDaemonThread);
mqttInfo("mqtt is stoped");
#endif
mqttInfo("mqtt is stopped");
}
}
void mqttCleanUpSystem() {
#if 0
mqttInfo("starting to cleanup mqtt");
free(recntStatus.user_name);
free(recntStatus.password);
free(recntStatus.hostname);
free(recntStatus.port);
free(recntStatus.topic);
free(topicPath);
mqttStopSystem();
mqttInfo("mqtt is cleaned up");
#endif
}
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) {
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*)malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size);
topic_name[published->topic_name_size] = '\0';
mqttInfo("received publish('%s'): %s", topic_name, (const char*)published->application_message);
char _token[128] = {0};
char _dbname[128] = {0};
char _tablename[128] = {0};
if (mqttConnect == NULL) {
mqttInfo("connect database");
taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &mqttClient, &mqttConnect);
}
if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) {
char* p_p_cmd_part[5] = {0};
char copystr[1024] = {0};
strncpy(copystr, topic_name, MIN(1024, published->topic_name_size));
char part_index = split(copystr, "/", p_p_cmd_part, 10);
if (part_index < 4) {
mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", topic_name);
void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) {
const char* content = published->application_message;
mqttDebug("receive mqtt message, size:%d", (int)published->application_message_size);
if (tsMqttConnect == NULL) {
tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0);
if (tsMqttConnect == NULL) {
mqttError("failed to connect to tdengine, reason:%s", tstrerror(terrno));
return;
} else {
strncpy(_token, p_p_cmd_part[1], 127);
strncpy(_dbname, p_p_cmd_part[2], 127);
strncpy(_tablename, p_p_cmd_part[3], 127);
mqttInfo("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
_tablename);
mqttInfo("successfully connected to the tdengine");
}
}
if (mqttConnect != NULL) {
char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename);
mqttInfo("query:%s", _sql);
taos_query_a(mqttConnect, _sql, mqttQueryInsertCallback, &mqttClient);
mqttInfo("free sql:%s", _sql);
free(_sql);
}
mqttTrace("receive mqtt message, content:%s", content);
char* sql = mqttConverJsonToSql((char*)content, (int)published->application_message_size);
if (sql != NULL) {
void* res = taos_query(tsMqttConnect, sql);
int code = taos_errno(res);
if (code != 0) {
mqttError("failed to exec sql, reason:%s sql:%s", tstrerror(code), sql);
} else {
mqttTrace("successfully to exec sql:%s", sql);
}
taos_free_result(res);
} else {
mqttError("failed to parse mqtt message");
}
free(topic_name);
}
void* mqttClientRefresher(void* client) {
while (mttIsRuning) {
while (tsMqttIsRuning) {
mqtt_sync((struct mqtt_client*)client);
taosMsleep(100);
}
mqttDebug("quit refresher");
mqttDebug("mqtt quit refresher");
return NULL;
}
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) {
#if 0
void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon) {
mqttInfo("clean up mqtt module");
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
#endif
}
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) {
mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code));
taos_close(mqttConnect);
mqttConnect = NULL;
return;
if (sockfd != -1) {
close(sockfd);
}
mqttDebug("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));
}
void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) {
mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code));
} else if (code == 0) {
mqttError("mqtt:%d, save data failed, affect rows:%d", code, code);
} else {
mqttInfo("mqtt:%d, save data success, code:%s", code, tstrerror(code));
if (client_daemon != NULL) {
pthread_cancel(*client_daemon);
}
}
void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) {
mqttInfo("reconnect client");
struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr);
void mqttReconnectClient(struct mqtt_client* client, void** unused) {
mqttInfo("mqtt tries to connect to the mqtt server");
/* Close the clients socket if this isn't the initial reconnect call */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd);
}
/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
mqttError("mqttReconnectClient: called while client was in error state \"%s\"", mqtt_error_str(client->error));
mqttError("mqtt client was in error state %s", mqtt_error_str(client->error));
}
/* Open a new socket. */
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
if (sockfd == -1) {
mqttError("failed to open socket: ");
mqttCleanup(EXIT_FAILURE, sockfd, NULL);
int sockfd = open_nb_socket(tsMqttHostName, tsMqttPort);
if (sockfd < 0) {
mqttError("mqtt client failed to open socket %s:%s", tsMqttHostName, tsMqttPort);
//mqttCleanupRes(EXIT_FAILURE, sockfd, NULL);
return;
}
/* Reinitialize the client. */
mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf,
reconnect_state->recvbufsz);
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(client, reconnect_state->client_id, NULL, NULL, 0, reconnect_state->user_name, reconnect_state->password,connect_flags, 400);
/* Subscribe to the topic. */
mqtt_subscribe(client, reconnect_state->topic, 0);
mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400);
mqtt_subscribe(client, tsMqttTopic, 0);
}
\ No newline at end of file
......@@ -8,10 +8,9 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(query ${SRC})
SET_SOURCE_FILES_PROPERTIES(src/sql.c PROPERTIES COMPILE_FLAGS -w)
TARGET_LINK_LIBRARIES(query tsdb tutil)
IF (TD_LINUX)
TARGET_LINK_LIBRARIES(query tsdb tutil m rt)
TARGET_LINK_LIBRARIES(query m rt)
ADD_SUBDIRECTORY(tests)
ELSEIF (TD_WINDOWS)
TARGET_LINK_LIBRARIES(query tsdb tutil)
ENDIF ()
......@@ -3,13 +3,10 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb common tutil)
IF (TD_LINUX)
ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb common tutil)
# Someone has no gtest directory, so comment it
# ADD_SUBDIRECTORY(tests)
ELSEIF (TD_WINDOWS)
ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb common tutil)
ENDIF ()
......@@ -3,9 +3,10 @@ PROJECT(TDengine)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tutil ${SRC})
TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4)
IF (TD_LINUX)
TARGET_LINK_LIBRARIES(tutil pthread osdetail m rt lz4)
TARGET_LINK_LIBRARIES(tutil m rt)
ADD_SUBDIRECTORY(tests)
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
......@@ -24,7 +25,7 @@ IF (TD_LINUX)
ENDIF ()
ELSEIF (TD_WINDOWS)
TARGET_LINK_LIBRARIES(tutil iconv regex pthread osdetail winmm IPHLPAPI ws2_32 lz4 wepoll)
TARGET_LINK_LIBRARIES(tutil iconv regex winmm IPHLPAPI ws2_32 wepoll)
ELSEIF(TD_DARWIN)
TARGET_LINK_LIBRARIES(tutil iconv pthread osdetail lz4)
TARGET_LINK_LIBRARIES(tutil iconv)
ENDIF()
......@@ -47,6 +47,15 @@ static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
#ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(tsync_h shandle) {}
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif
int32_t vnodeInitResources() {
vnodeInitWriteFp();
vnodeInitReadFp();
......@@ -289,12 +298,16 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
pVnode->sync = syncStart(&syncInfo);
#ifndef _SYNC
pVnode->role = TAOS_SYNC_ROLE_MASTER;
#else
if (pVnode->sync == NULL) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode);
return terrno;
}
#endif
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
if (pVnode->qMgmt == NULL) {
......
......@@ -7,6 +7,5 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
IF (TD_LINUX)
ADD_LIBRARY(twal ${SRC})
TARGET_LINK_LIBRARIES(twal tutil common)
ADD_SUBDIRECTORY(test)
ENDIF ()
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100000
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c mqtt -v 1
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
sql create database mqttdb;
sql create table mqttdb.devices(ts timestamp, value double) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16));
sleep 1000
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -50,7 +50,7 @@ $tbPrefix = t
$i = 0
while $i < 10
$db = db . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table st (ts timestamp, i int) tags(j int);
......
......@@ -8,7 +8,7 @@ sql connect
print ======== step1
sql create database db maxTables 4;
sql create database db;
sql use db
$tbPrefix = t
......
......@@ -15,7 +15,7 @@ sql connect
sql create dnode $hostname2
sleep 2000
sql create database db maxTables 4
sql create database db
sql use db
print ========== step1
......
......@@ -22,7 +22,7 @@ $stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db maxrows 400 cache 1 maxTables 4
sql create database $db maxrows 400 cache 1
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
......
......@@ -28,7 +28,7 @@ $tstart = 100000
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db maxTables 4 keep 36500
sql create database if not exists $db keep 36500
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
......
......@@ -24,7 +24,7 @@ $tstart = 100000
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db maxTables 4 keep 36500
sql create database if not exists $db keep 36500
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
......
......@@ -22,7 +22,7 @@ $tstart = 100000
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db maxTables 4 keep 36500
sql create database if not exists $db keep 36500
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
......
......@@ -21,7 +21,7 @@ $stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
......
......@@ -22,7 +22,7 @@ sql drop database if exists $db
$paramRows = 200
$rowNum = $paramRows * 4
$rowNum = $rowNum / 5
sql create database $db maxrows $paramRows maxTables 4
sql create database $db maxrows $paramRows
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 bool, c6 binary(10), c7 nchar(10)) tags(t1 int)
......
......@@ -22,7 +22,7 @@ $tstart = 100000
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db maxTables 4 keep 36500
sql create database if not exists $db keep 36500
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
......
......@@ -23,7 +23,7 @@ $tstart = 100000
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db maxTables 4 keep 36500
sql create database if not exists $db keep 36500
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
......
......@@ -15,7 +15,7 @@ $db = $dbPrefix
$stb = $stbPrefix
sql drop database if exists $db
sql create database $db maxrows 200 maxTables 4
sql create database $db maxrows 200
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 bool, c6 binary(10), c7 nchar(10)) tags(t1 int)
......
......@@ -21,7 +21,7 @@ $db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database if exists $db
sql create database $db maxrows 200 cache 16 maxTables 4
sql create database $db maxrows 200 cache 16
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 binary(15), t2 int, t3 bigint, t4 nchar(10), t5 double, t6 bool)
......
......@@ -20,7 +20,7 @@ $db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database if exists $db
sql create database $db maxrows 200 maxTables 4
sql create database $db maxrows 200
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 timestamp, c2 int) tags(t1 binary(20))
......
......@@ -21,7 +21,7 @@ $mt = $mtPrefix . $i
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db maxTables 4
sql create database if not exists $db
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
......
......@@ -54,7 +54,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -293,6 +293,7 @@ cd ../../../debug; make
./test.sh -f unique/stable/replica3_dnode6.sim
./test.sh -f unique/stable/replica3_vnode3.sim
./test.sh -f unique/mnode/mgmt20.sim
./test.sh -f unique/mnode/mgmt21.sim
./test.sh -f unique/mnode/mgmt22.sim
./test.sh -f unique/mnode/mgmt23.sim
......
......@@ -119,7 +119,7 @@ echo "tsdbDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "mqttDebugFlag 135" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG
......
......@@ -61,7 +61,7 @@ $totalTableNum = 10
$sleepTimer = 3000
$db = db
sql create database $db replica 2 maxTables $totalTableNum
sql create database $db replica 2
sql use $db
# create table , insert data
......
......@@ -61,7 +61,7 @@ $totalTableNum = 4
$sleepTimer = 3000
$db = db
sql create database $db cache 1 replica 2 maxTables $totalTableNum
sql create database $db cache 1 replica 2
sql use $db
# create table , insert data
......
......@@ -61,8 +61,8 @@ $totalTableNum = 20
$sleepTimer = 3000
$db = db
print create database $db replica 3 maxTables $totalTableNum
sql create database $db replica 3 maxTables $totalTableNum
print create database $db replica 3
sql create database $db replica 3
sql use $db
# create table , insert data
......
......@@ -62,8 +62,8 @@ $sleepTimer = 3000
$maxTables = $totalTableNum * 2
$db = db
print create database $db replica 3 maxTables $maxTables
sql create database $db replica 3 maxTables $maxTables
print create database $db replica 3
sql create database $db replica 3
sql use $db
# create table , insert data
......
......@@ -18,7 +18,7 @@ print ========== prepare data
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
sql create database db blocks 3 cache 1 maxTables $maxTables
sql create database db blocks 3 cache 1
sql use db
print ========== step1
......
......@@ -16,7 +16,7 @@ print ========== prepare data
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
sql create database db blocks 3 cache 1 maxTables $maxTables
sql create database db blocks 3 cache 1
sql use db
print ========== step1
......
......@@ -40,7 +40,7 @@ print ========= start dnode1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database c_b1_d1 maxTables 4
sql create database c_b1_d1
sql use c_b1_d1
sql create table c_b1_t1 (t timestamp, i int)
......@@ -50,7 +50,7 @@ sql insert into c_b1_t1 values(1520000022013, 13)
sql insert into c_b1_t1 values(1520000023012, 12)
sql insert into c_b1_t1 values(1520000024011, 11)
sql create database c_b1_d2 maxTables 4
sql create database c_b1_d2
sql use c_b1_d2
sql create table c_b1_t2 (t timestamp, i int)
sql insert into c_b1_t2 values(1520000020025, 25)
......@@ -107,7 +107,7 @@ print dnode2 ==> $dnode2Role
print ============================== step3
print ========= add db3
sql create database c_b1_d3 maxTables 4
sql create database c_b1_d3
sql use c_b1_d3
sql create table c_b1_t3 (t timestamp, i int)
sql insert into c_b1_t3 values(1520000020035, 35)
......@@ -280,7 +280,7 @@ if $dnode4Role != slave then
endi
print ============================== step10
sql create database c_b1_d4 maxTables 4
sql create database c_b1_d4
sql use c_b1_d4
sql create table c_b1_t4 (t timestamp, i int)
sql insert into c_b1_t4 values(1520000020045, 45)
......@@ -318,7 +318,7 @@ sql use c_b1_d2
sql insert into c_b1_t2 values(1520000025026, 26)
print ============================== step12
sql create database c_b1_d5 maxTables 4
sql create database c_b1_d5
sql use c_b1_d5
sql_error create table c_b1_t5 (t timestamp, i int) -x error3
......@@ -343,7 +343,7 @@ sql insert into c_b1_t5 values(1520000022053, 53)
sql insert into c_b1_t5 values(1520000023052, 52)
sql insert into c_b1_t5 values(1520000024051, 51)
sql create database c_b1_d6 maxTables 4
sql create database c_b1_d6
sql use c_b1_d6
sql create table c_b1_t6 (t timestamp, i int)
sql insert into c_b1_t6 values(1520000020065, 65)
......@@ -375,7 +375,7 @@ sql create dnode $hostname6
system sh/exec.sh -n dnode6 -s start
sleep 15000
sql create database c_b1_d7 maxTables 4
sql create database c_b1_d7
sql use c_b1_d7
sql create table c_b1_t7 (t timestamp, i int)
sql insert into c_b1_t7 values(1520000020075, 75)
......@@ -384,7 +384,7 @@ sql insert into c_b1_t7 values(1520000022073, 73)
sql insert into c_b1_t7 values(1520000023072, 72)
sql insert into c_b1_t7 values(1520000024071, 71)
sql create database c_b1_d8 maxTables 4
sql create database c_b1_d8
sql use c_b1_d8
sql create table c_b1_t8 (t timestamp, i int)
sql insert into c_b1_t8 values(1520000020085, 85)
......
......@@ -47,7 +47,7 @@ system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sleep 4001
sql create database c_b2_d1 replica 2 maxTables 4
sql create database c_b2_d1 replica 2
sql use c_b2_d1
sql create table c_b2_t1 (t timestamp, i int)
sql insert into c_b2_t1 values(1520000020015, 15)
......@@ -56,7 +56,7 @@ sql insert into c_b2_t1 values(1520000022013, 13)
sql insert into c_b2_t1 values(1520000023012, 12)
sql insert into c_b2_t1 values(1520000024011, 11)
sql create database c_b2_d2 replica 2 maxTables 4
sql create database c_b2_d2 replica 2
sql use c_b2_d2
sql create table c_b2_t2 (t timestamp, i int)
sql insert into c_b2_t2 values(1520000020025, 25)
......@@ -65,7 +65,7 @@ sql insert into c_b2_t2 values(1520000022023, 23)
sql insert into c_b2_t2 values(1520000023022, 22)
sql insert into c_b2_t2 values(1520000024021, 21)
sql create database c_b2_d3 replica 2 maxTables 4
sql create database c_b2_d3 replica 2
sql use c_b2_d3
sql create table c_b2_t3 (t timestamp, i int)
sql insert into c_b2_t3 values(1520000020035, 35)
......
......@@ -57,7 +57,7 @@ sleep 3000
print ============== step2: create db1 with replica 3
$db = db1
print create database $db replica 3
#sql create database $db replica 3 maxTables $totalTableNum
#sql create database $db replica 3
sql create database $db replica 3
sql use $db
......
......@@ -57,7 +57,7 @@ sleep 3000
print ============== step2: create db1 with replica 3
$db = db1
print create database $db replica 3
#sql create database $db replica 3 maxTables $totalTableNum
#sql create database $db replica 3
sql create database $db replica 3
sql use $db
......
......@@ -58,7 +58,7 @@ print ============== step2: create db1 with replica 3
$replica = 3
$db = db1
print create database $db replica $replica
#sql create database $db replica 3 maxTables $totalTableNum
#sql create database $db replica 3
sql create database $db replica $replica
sql use $db
......
......@@ -58,7 +58,7 @@ print ============== step2: create db1 with replica 3
$replica = 3
$db = db1
print create database $db replica $replica
#sql create database $db replica 3 maxTables $totalTableNum
#sql create database $db replica 3
sql create database $db replica $replica
sql use $db
......
......@@ -56,7 +56,7 @@ if $data2_3 != slave then
endi
print ========== step2
sql create database d1 maxTables 4
sql create database d1
sql create table d1.t1 (ts timestamp, i int)
sql create table d1.t2 (ts timestamp, i int)
sql create table d1.t3 (ts timestamp, i int)
......
......@@ -30,7 +30,7 @@ system sh/exec.sh -n dnode1 -s start
sql connect
sleep 3000
sql create database d1 maxTables 4
sql create database d1
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 15)
sql insert into d1.t1 values(now+2s, 14)
......@@ -68,7 +68,7 @@ if $data2_2 != 1 then
endi
print ========== step3
sql create database d2 maxTables 4
sql create database d2
sql create table d2.t2 (t timestamp, i int)
sql insert into d2.t2 values(now+1s, 25)
sql insert into d2.t2 values(now+2s, 24)
......@@ -139,7 +139,7 @@ if $data2_3 != 2 then
endi
print ========== step6
sql create database d3 maxTables 4
sql create database d3
sql create table d3.t3 (t timestamp, i int)
sql insert into d3.t3 values(now+1s, 35)
sql insert into d3.t3 values(now+2s, 34)
......@@ -193,7 +193,7 @@ if $data2_4 != 1 then
endi
print ========== step8
sql create database d4 maxTables 4
sql create database d4
sql create table d4.t4 (t timestamp, i int)
sql insert into d4.t4 values(now+1s, 45)
sql insert into d4.t4 values(now+2s, 44)
......
......@@ -28,7 +28,7 @@ system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sleep 3000
sql create database d1 replica 2 maxTables 4
sql create database d1 replica 2
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 15)
sql insert into d1.t1 values(now+2s, 14)
......@@ -36,7 +36,7 @@ sql insert into d1.t1 values(now+3s, 13)
sql insert into d1.t1 values(now+4s, 12)
sql insert into d1.t1 values(now+5s, 11)
sql create database d2 replica 2 maxTables 4
sql create database d2 replica 2
sql create table d2.t2 (t timestamp, i int)
sql insert into d2.t2 values(now+1s, 25)
sql insert into d2.t2 values(now+2s, 24)
......@@ -117,7 +117,7 @@ if $data2_4 != 2 then
endi
print ========== step4
sql create database d3 replica 2 maxTables 4
sql create database d3 replica 2
sql create table d3.t3 (t timestamp, i int)
sql insert into d3.t3 values(now+1s, 35)
sql insert into d3.t3 values(now+2s, 34)
......
......@@ -33,7 +33,7 @@ system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
sql create database d1 replica 3 maxTables 4
sql create database d1 replica 3
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 15)
sql insert into d1.t1 values(now+2s, 14)
......@@ -41,7 +41,7 @@ sql insert into d1.t1 values(now+3s, 13)
sql insert into d1.t1 values(now+4s, 12)
sql insert into d1.t1 values(now+5s, 11)
sql create database d2 replica 3 maxTables 4
sql create database d2 replica 3
sql create table d2.t2 (t timestamp, i int)
sql insert into d2.t2 values(now+1s, 25)
sql insert into d2.t2 values(now+2s, 24)
......@@ -136,7 +136,7 @@ if $data2_5 != 2 then
endi
print ========== step4
sql create database d3 replica 3 maxTables 4
sql create database d3 replica 3
sql create table d3.t3 (t timestamp, i int)
sql insert into d3.t3 values(now+1s, 35)
sql insert into d3.t3 values(now+2s, 34)
......
......@@ -20,7 +20,7 @@ system sh/exec.sh -n dnode1 -s start
sql connect
sleep 3000
sql create database d1 maxTables 4
sql create database d1
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 15)
sql insert into d1.t1 values(now+2s, 14)
......@@ -28,7 +28,7 @@ sql insert into d1.t1 values(now+3s, 13)
sql insert into d1.t1 values(now+4s, 12)
sql insert into d1.t1 values(now+5s, 11)
sql create database d2 maxTables 4
sql create database d2
sql create table d2.t2 (t timestamp, i int)
sql insert into d2.t2 values(now+1s, 25)
sql insert into d2.t2 values(now+2s, 24)
......@@ -65,7 +65,7 @@ if $data2_2 != 2 then
endi
print ========== step3
sql create database d3 replica 2 maxTables 4
sql create database d3 replica 2
sql create table d3.t3 (t timestamp, i int)
sql insert into d3.t3 values(now+1s, 35)
sql insert into d3.t3 values(now+2s, 34)
......
......@@ -27,7 +27,7 @@ sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sleep 3000
sql create database d1 replica 2 maxTables 4
sql create database d1 replica 2
sql create table d1.t1(ts timestamp, i int)
sql insert into d1.t1 values(1588262400001, 1)
......
......@@ -20,7 +20,7 @@ system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
sql create database d1 maxTables 4
sql create database d1
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 15)
sql insert into d1.t1 values(now+2s, 14)
......@@ -28,7 +28,7 @@ sql insert into d1.t1 values(now+3s, 13)
sql insert into d1.t1 values(now+4s, 12)
sql insert into d1.t1 values(now+5s, 11)
sql create database d2 maxTables 4
sql create database d2
sql create table d2.t2 (t timestamp, i int)
sql insert into d2.t2 values(now+1s, 25)
sql insert into d2.t2 values(now+2s, 24)
......@@ -47,7 +47,7 @@ sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sleep 9000
sql create database d3 replica 2 maxTables 4
sql create database d3 replica 2
sql create table d3.t3 (t timestamp, i int)
sql insert into d3.t3 values(now+1s, 35)
sql insert into d3.t3 values(now+2s, 34)
......
......@@ -20,7 +20,7 @@ system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
sql create database d1 maxTables 4
sql create database d1
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(1588262400001, 15)
sql insert into d1.t1 values(1588262400002, 14)
......@@ -28,7 +28,7 @@ sql insert into d1.t1 values(1588262400003, 13)
sql insert into d1.t1 values(1588262400004, 12)
sql insert into d1.t1 values(1588262400005, 11)
sql create database d2 maxTables 4
sql create database d2
sql create table d2.t2 (t timestamp, i int)
sql insert into d2.t2 values(1588262400001, 25)
sql insert into d2.t2 values(1588262400002, 24)
......@@ -47,7 +47,7 @@ sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sleep 9000
sql create database d3 replica 2 maxTables 4
sql create database d3 replica 2
sql create table d3.t3 (t timestamp, i int)
sql insert into d3.t3 values(1588262400001, 35)
sql insert into d3.t3 values(1588262400002, 34)
......
......@@ -19,7 +19,7 @@ print ========== step1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database d1 maxTables 4
sql create database d1
sql create table d1.t1 (t timestamp, i int)
sql insert into d1.t1 values(now+1s, 15)
sql insert into d1.t1 values(now+2s, 14)
......@@ -55,7 +55,7 @@ if $data2_2 != 1 then
endi
print ========== step3
sql create database d2 maxTables 4
sql create database d2
sql create table d2.t2 (t timestamp, i int)
sql insert into d2.t2 values(now+1s, 25)
......@@ -123,7 +123,7 @@ if $data2_3 != 2 then
endi
print ========== step6
sql create database d3 maxTables 4
sql create database d3
sql create table d3.t3 (t timestamp, i int)
sql insert into d3.t3 values(now+1s, 35)
sql insert into d3.t3 values(now+2s, 34)
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode1 -c monitor -v 1
system sh/cfg.sh -n dnode2 -c monitor -v 1
print ============== step1
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 3000
sql connect
print ============== step2
sql create dnode $hostname2
$x = 0
show2:
$x = $x + 1
sleep 2000
if $x == 10 then
return -1
endi
sql show mnodes
print dnode1 ==> $data2_1
print dnode2 ==> $data2_2
if $data2_1 != master then
goto show2
endi
if $data2_2 != slave then
goto show2
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
print ============== step3
system sh/exec.sh -n dnode2 -s start
sleep 10000
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== step4
sql select * from log.dn1
$d1_first = $rows
sql select * from log.dn2
$d2_first = $rows
sleep 3000
sql select * from log.dn1
$d1_second = $rows
sql select * from log.dn2
$d2_second = $rows
print dnode1 $d1_first $d1_second
print dnode2 $d2_first $d2_second
if $d1_first >= $d1_second then
return -1
endi
if $d2_first >= $d2_second then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
......@@ -29,7 +29,7 @@ $db = $dbPrefix
$mt = $mtPrefix
$st = $stPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)
......
......@@ -38,7 +38,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -43,7 +43,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -47,7 +47,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -54,7 +54,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db replica 2 maxTables 4
sql create database $db replica 2
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -37,7 +37,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db replica 2 maxTables 4
sql create database $db replica 2
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -78,7 +78,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db replica 3 maxTables 4
sql create database $db replica 3
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -54,7 +54,7 @@ $i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql create database $db replica 3 maxTables 4
sql create database $db replica 3
sql use $db
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
......
......@@ -32,7 +32,7 @@ sql connect
print ============== step1
$db = $dbPrefix
sql create database $db maxTables 4
sql create database $db
sql use $db
$i = 0
......
......@@ -34,7 +34,7 @@ $db = $dbPrefix
$mt = $mtPrefix
$st = $stPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)
......
......@@ -65,7 +65,7 @@ $db = $dbPrefix . $i
$mt = $mtPrefix . $i
$st = $stPrefix . $i
sql create database $db maxTables 4
sql create database $db
sql use $db
sql create table $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册