提交 671e8622 编写于 作者: S Shengliang Guan

Merge from develop

...@@ -24,3 +24,7 @@ ENDIF () ...@@ -24,3 +24,7 @@ ENDIF ()
IF (TD_MEM_CHECK) IF (TD_MEM_CHECK)
ADD_DEFINITIONS(-DTAOS_MEM_CHECK) ADD_DEFINITIONS(-DTAOS_MEM_CHECK)
ENDIF () ENDIF ()
IF (TD_RANDOM_FILE_FAIL)
ADD_DEFINITIONS(-DTAOS_RANDOM_FILE_FAIL)
ENDIF ()
...@@ -30,4 +30,9 @@ ENDIF () ...@@ -30,4 +30,9 @@ ENDIF ()
IF (${MEM_CHECK} MATCHES "true") IF (${MEM_CHECK} MATCHES "true")
SET(TD_MEM_CHECK TRUE) SET(TD_MEM_CHECK TRUE)
MESSAGE(STATUS "build with memory check") MESSAGE(STATUS "build with memory check")
ENDIF () ENDIF ()
\ No newline at end of file
IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET(TD_RANDOM_FILE_FAIL TRUE)
MESSAGE(STATUS "build with random-file-fail enabled")
ENDIF ()
...@@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { ...@@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.qhandle = 0x1; pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0; pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) { } else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscCacheHandle,false);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) { } else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql); tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) { } else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
......
...@@ -1950,7 +1950,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { ...@@ -1950,7 +1950,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
} }
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) { int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscCacheHandle, false);
return 0; return 0;
} }
...@@ -1996,7 +1996,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { ...@@ -1996,7 +1996,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
if (isSuperTable) { // if it is a super table, reset whole query cache if (isSuperTable) { // if it is a super table, reset whole query cache
tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name); tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
taosCacheEmpty(tscCacheHandle); taosCacheEmpty(tscCacheHandle, false);
} }
} }
......
...@@ -176,6 +176,7 @@ void dnodeCleanupMgmt() { ...@@ -176,6 +176,7 @@ void dnodeCleanupMgmt() {
tsMgmtQset = NULL; tsMgmtQset = NULL;
tsMgmtQueue = NULL; tsMgmtQueue = NULL;
vnodeCleanupResources();
} }
void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) { void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
...@@ -242,8 +243,14 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { ...@@ -242,8 +243,14 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
int32_t vnode = atoi(de->d_name + 5); int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue; if (vnode == 0) continue;
vnodeList[*numOfVnodes] = vnode;
(*numOfVnodes)++; (*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
dError("vgId:%d, too many vnode directory in disk, exist:%d max:%d", vnode, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = vnode;
}
} }
} }
closedir(dir); closedir(dir);
...@@ -276,7 +283,7 @@ static void *dnodeOpenVnode(void *param) { ...@@ -276,7 +283,7 @@ static void *dnodeOpenVnode(void *param) {
static int32_t dnodeOpenVnodes() { static int32_t dnodeOpenVnodes() {
int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t)); int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
int32_t numOfVnodes; int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) { if (status != TSDB_CODE_SUCCESS) {
...@@ -337,7 +344,7 @@ static int32_t dnodeOpenVnodes() { ...@@ -337,7 +344,7 @@ static int32_t dnodeOpenVnodes() {
void dnodeStartStream() { void dnodeStartStream() {
int32_t vnodeList[TSDB_MAX_VNODES]; int32_t vnodeList[TSDB_MAX_VNODES];
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) { if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed"); dInfo("get dnode list failed");
...@@ -352,15 +359,14 @@ void dnodeStartStream() { ...@@ -352,15 +359,14 @@ void dnodeStartStream() {
} }
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES); int32_t vnodeList[TSDB_MAX_VNODES];
int32_t numOfVnodes; int32_t numOfVnodes = 0;
int32_t status; int32_t status;
status = dnodeGetVnodeList(vnodeList, &numOfVnodes); status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) { if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed"); dInfo("get dnode list failed");
free(vnodeList);
return; return;
} }
...@@ -368,7 +374,6 @@ static void dnodeCloseVnodes() { ...@@ -368,7 +374,6 @@ static void dnodeCloseVnodes() {
vnodeClose(vnodeList[i]); vnodeClose(vnodeList[i]);
} }
free(vnodeList);
dInfo("total vnodes:%d are all closed", numOfVnodes); dInfo("total vnodes:%d are all closed", numOfVnodes);
} }
...@@ -391,7 +396,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -391,7 +396,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId); pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
} }
void *pVnode = vnodeAccquireVnode(pCreate->cfg.vgId); void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
int32_t code = vnodeAlter(pVnode, pCreate); int32_t code = vnodeAlter(pVnode, pCreate);
vnodeRelease(pVnode); vnodeRelease(pVnode);
......
...@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -98,11 +98,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
if (pMsg->msgType == TSDB_MSG_TYPE_FETCH) { pVnode = vnodeAcquireVnode(pHead->vgId);
pVnode = vnodeGetVnode(pHead->vgId);
} else {
pVnode = vnodeAccquireVnode(pHead->vgId);
}
if (pVnode == NULL) { if (pVnode == NULL) {
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
...@@ -179,24 +175,19 @@ void dnodeFreeVnodeRqueue(void *rqueue) { ...@@ -179,24 +175,19 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads // dynamically adjust the number of threads
} }
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
assert(pVnode != NULL);
taos_queue queue = vnodeAcquireRqueue(pVnode);
taos_queue queue = vnodeGetRqueue(pVnode); taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
} }
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
code = TSDB_CODE_SUCCESS;
}
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp, .pCont = pRead->rspRet.rsp,
...@@ -206,6 +197,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { ...@@ -206,6 +197,12 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont); rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
}
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode);
return;
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
...@@ -219,9 +216,16 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -219,9 +216,16 @@ static void *dnodeProcessReadQueue(void *param) {
break; break;
} }
dDebug("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type);
int32_t code = vnodeProcessRead(pVnode, pReadMsg); int32_t code = vnodeProcessRead(pVnode, pReadMsg);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
if (type == TAOS_QTYPE_RPC) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
} else {
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
}
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
} }
......
...@@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); ...@@ -53,6 +53,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue); void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode); void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue); void dnodeFreeVnodeRqueue(void *rqueue);
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
......
...@@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); ...@@ -84,6 +84,13 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
*/ */
int32_t qKillQuery(qinfo_t qinfo); int32_t qKillQuery(qinfo_t qinfo);
void* qOpenQueryMgmt(int32_t vgId);
void qSetQueryMgmtClosed(void* pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, void* qInfo);
void** qAcquireQInfo(void* pMgmt, void** key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -366,6 +366,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -366,6 +366,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TAOS_QTYPE_FWD 1 #define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2 #define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_CQ 3 #define TAOS_QTYPE_CQ 3
#define TAOS_QTYPE_QUERY 4
typedef enum { typedef enum {
TSDB_SUPER_TABLE = 0, // super table TSDB_SUPER_TABLE = 0, // super table
......
...@@ -49,16 +49,19 @@ int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); ...@@ -49,16 +49,19 @@ int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode); void vnodeRelease(void *pVnode);
void* vnodeAccquireVnode(int32_t vgId); // add refcount void* vnodeAcquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void* vnodeAcquireRqueue(void *);
void* vnodeGetRqueue(void *); void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
void vnodeBuildStatusMsg(void * param); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes); void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes);
void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include <wordexp.h> #include <wordexp.h>
#include <regex.h>
#include "taos.h" #include "taos.h"
#include "tutil.h" #include "tutil.h"
...@@ -54,6 +55,7 @@ static struct argp_option options[] = { ...@@ -54,6 +55,7 @@ static struct argp_option options[] = {
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3}, {0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3}, {0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3}, {0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
{0, 's', "sql file", 0, "The select sql file.", 3},
{0, 'M', 0, 0, "Use metric flag.", 13}, {0, 'M', 0, 0, "Use metric flag.", 13},
{0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14}, {0, 'o', "outputfile", 0, "Direct output to the named file. Default is './output.txt'.", 14},
{0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 6}, {0, 'q', "query_mode", 0, "Query mode--0: SYNC, 1: ASYNC. Default is SYNC.", 6},
...@@ -79,6 +81,7 @@ typedef struct DemoArguments { ...@@ -79,6 +81,7 @@ typedef struct DemoArguments {
char *password; char *password;
char *database; char *database;
char *tb_prefix; char *tb_prefix;
char *sqlFile;
bool use_metric; bool use_metric;
bool insert_only; bool insert_only;
char *output_file; char *output_file;
...@@ -120,6 +123,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -120,6 +123,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'o': case 'o':
arguments->output_file = arg; arguments->output_file = arg;
break; break;
case 's':
arguments->sqlFile = arg;
break;
case 'q': case 'q':
arguments->mode = atoi(arg); arguments->mode = atoi(arg);
break; break;
...@@ -179,10 +185,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -179,10 +185,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
arguments->tb_prefix = arg; arguments->tb_prefix = arg;
break; break;
case 'M': case 'M':
arguments->use_metric = true; arguments->use_metric = false;
break; break;
case 'x': case 'x':
arguments->insert_only = true; arguments->insert_only = false;
break; break;
case 'c': case 'c':
if (wordexp(arg, &full_path, 0) != 0) { if (wordexp(arg, &full_path, 0) != 0) {
...@@ -253,6 +259,9 @@ typedef struct { ...@@ -253,6 +259,9 @@ typedef struct {
int data_of_rate; int data_of_rate;
int64_t start_time; int64_t start_time;
bool do_aggreFunc; bool do_aggreFunc;
char* cols;
bool use_metric;
sem_t mutex_sem; sem_t mutex_sem;
int notFinished; int notFinished;
...@@ -305,6 +314,8 @@ void rand_string(char *str, int size); ...@@ -305,6 +314,8 @@ void rand_string(char *str, int size);
double getCurrentTime(); double getCurrentTime();
void callBack(void *param, TAOS_RES *res, int code); void callBack(void *param, TAOS_RES *res, int code);
void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass);
void querySqlFile(TAOS* taos, char* sqlFile);
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SDemoArguments arguments = { NULL, // host SDemoArguments arguments = { NULL, // host
...@@ -313,6 +324,7 @@ int main(int argc, char *argv[]) { ...@@ -313,6 +324,7 @@ int main(int argc, char *argv[]) {
"taosdata", // password "taosdata", // password
"test", // database "test", // database
"t", // tb_prefix "t", // tb_prefix
NULL,
false, // use_metric false, // use_metric
false, // insert_only false, // insert_only
"./output.txt", // output_file "./output.txt", // output_file
...@@ -361,7 +373,7 @@ int main(int argc, char *argv[]) { ...@@ -361,7 +373,7 @@ int main(int argc, char *argv[]) {
abort(); abort();
#endif #endif
} }
enum MODE query_mode = arguments.mode; enum MODE query_mode = arguments.mode;
char *ip_addr = arguments.host; char *ip_addr = arguments.host;
uint16_t port = arguments.port; uint16_t port = arguments.port;
...@@ -385,6 +397,13 @@ int main(int argc, char *argv[]) { ...@@ -385,6 +397,13 @@ int main(int argc, char *argv[]) {
char dataString[STRING_LEN]; char dataString[STRING_LEN];
bool do_aggreFunc = true; bool do_aggreFunc = true;
if (NULL != arguments.sqlFile) {
TAOS* qtaos = taos_connect(ip_addr, user, pass, db_name, port);
querySqlFile(qtaos, arguments.sqlFile);
taos_close(qtaos);
return 0;
}
memset(dataString, 0, STRING_LEN); memset(dataString, 0, STRING_LEN);
int len = 0; int len = 0;
...@@ -495,47 +514,19 @@ int main(int argc, char *argv[]) { ...@@ -495,47 +514,19 @@ int main(int argc, char *argv[]) {
len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary); len += snprintf(cols + len, STRING_LEN - len, ",f%d %s(%d))", colIndex + 1, data_type[colIndex % count_data_type], len_of_binary);
} }
if (!use_metric) { if (use_metric) {
/* Create all the tables; */
printf("Creating %d table(s)......\n", ntables);
for (int i = 0; i < ntables; i++) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols);
queryDB(taos, command);
}
printf("Table(s) created!\n");
taos_close(taos);
} else {
/* Create metric table */ /* Create metric table */
printf("Creating meters super table...\n"); printf("Creating meters super table...\n");
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
queryDB(taos, command); queryDB(taos, command);
printf("meters created!\n"); printf("meters created!\n");
/* Create all the tables; */
printf("Creating %d table(s)......\n", ntables);
for (int i = 0; i < ntables; i++) {
int j;
if (i % 10 == 0) {
j = 10;
} else {
j = i % 10;
}
if (j % 2 == 0) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai");
} else {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing");
}
queryDB(taos, command);
}
printf("Table(s) created!\n");
taos_close(taos); taos_close(taos);
} }
/* Wait for table to create */ /* Wait for table to create */
multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass);
/* Insert data */ /* Insert data */
double ts = getCurrentTime(); double ts = getCurrentTime();
printf("Inserting data......\n"); printf("Inserting data......\n");
...@@ -685,6 +676,198 @@ int main(int argc, char *argv[]) { ...@@ -685,6 +676,198 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
#define MAX_SQL_SIZE 65536
void selectSql(TAOS* taos, char* sqlcmd)
{
TAOS_RES *pSql = taos_query(taos, sqlcmd);
int32_t code = taos_errno(pSql);
if (code != 0) {
printf("Failed to sqlcmd:%s, reason:%s\n", sqlcmd, taos_errstr(pSql));
taos_free_result(pSql);
exit(1);
}
int count = 0;
while (taos_fetch_row(pSql) != NULL) {
count++;
}
taos_free_result(pSql);
return;
}
/* Function to do regular expression check */
static int regexMatch(const char *s, const char *reg, int cflags) {
regex_t regex;
char msgbuf[100] = {0};
/* Compile regular expression */
if (regcomp(&regex, reg, cflags) != 0) {
printf("Fail to compile regex\n");
exit(-1);
}
/* Execute regular expression */
int reti = regexec(&regex, s, 0, NULL, 0);
if (!reti) {
regfree(&regex);
return 1;
} else if (reti == REG_NOMATCH) {
regfree(&regex);
return 0;
} else {
regerror(reti, &regex, msgbuf, sizeof(msgbuf));
printf("Regex match failed: %s\n", msgbuf);
regfree(&regex);
exit(-1);
}
return 0;
}
static int isCommentLine(char *line) {
if (line == NULL) return 1;
return regexMatch(line, "^\\s*#.*", REG_EXTENDED);
}
void querySqlFile(TAOS* taos, char* sqlFile)
{
FILE *fp = fopen(sqlFile, "r");
if (fp == NULL) {
printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno));
exit(-1);
}
int read_len = 0;
char * cmd = calloc(1, MAX_SQL_SIZE);
size_t cmd_len = 0;
char * line = NULL;
size_t line_len = 0;
double t = getCurrentTime();
while ((read_len = getline(&line, &line_len, fp)) != -1) {
if (read_len >= MAX_SQL_SIZE) continue;
line[--read_len] = '\0';
if (read_len == 0 || isCommentLine(line)) { // line starts with #
continue;
}
if (line[read_len - 1] == '\\') {
line[read_len - 1] = ' ';
memcpy(cmd + cmd_len, line, read_len);
cmd_len += read_len;
continue;
}
memcpy(cmd + cmd_len, line, read_len);
selectSql(taos, cmd);
memset(cmd, 0, MAX_SQL_SIZE);
cmd_len = 0;
}
t = getCurrentTime() - t;
printf("run %s took %.6f second(s)\n\n", sqlFile, t);
free(cmd);
if (line) free(line);
fclose(fp);
return;
}
void * createTable(void *sarg)
{
char command[BUFFER_SIZE] = "\0";
info *winfo = (info *)sarg;
if (!winfo->use_metric) {
/* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
queryDB(winfo->taos, command);
}
taos_close(winfo->taos);
} else {
/* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
int j;
if (i % 10 == 0) {
j = 10;
} else {
j = i % 10;
}
if (j % 2 == 0) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "shanghai");
} else {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", winfo->db_name, winfo->tb_prefix, i, winfo->db_name, j, "beijing");
}
queryDB(winfo->taos, command);
}
taos_close(winfo->taos);
}
return NULL;
}
void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntables, char* db_name, char* tb_prefix, char *ip_addr, uint16_t port, char *user, char *pass) {
double ts = getCurrentTime();
printf("create table......\n");
pthread_t *pids = malloc(threads * sizeof(pthread_t));
info *infos = malloc(threads * sizeof(info));
int a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int b = 0;
if (threads != 0)
b = ntables % threads;
int last = 0;
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
tstrncpy(t_info->tb_prefix, tb_prefix, MAX_TB_NAME_SIZE);
t_info->taos = taos_connect(ip_addr, user, pass, db_name, port);
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->use_metric = use_metric;
t_info->cols = cols;
pthread_create(pids + i, NULL, createTable, t_info);
}
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
double t = getCurrentTime() - ts;
printf("Spent %.4f seconds to create %d tables with %d connections\n", t, ntables, threads);
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
taos_close(t_info->taos);
sem_destroy(&(t_info->mutex_sem));
sem_destroy(&(t_info->lock_sem));
}
free(pids);
free(infos);
return ;
}
void *readTable(void *sarg) { void *readTable(void *sarg) {
info *rinfo = (info *)sarg; info *rinfo = (info *)sarg;
TAOS *taos = rinfo->taos; TAOS *taos = rinfo->taos;
......
...@@ -69,6 +69,7 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) { ...@@ -69,6 +69,7 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj; SDnodeObj *pDnode = pOper->pObj;
if (pDnode->status != TAOS_DN_STATUS_DROPPING) { if (pDnode->status != TAOS_DN_STATUS_DROPPING) {
pDnode->status = TAOS_DN_STATUS_OFFLINE; pDnode->status = TAOS_DN_STATUS_OFFLINE;
pDnode->lastAccess = tsAccessSquence;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() { ...@@ -68,7 +68,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_STREAM, mnodeProcessKillStreamMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_KILL_CONN, mnodeProcessKillConnectionMsg);
tsMnodeConnCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn"); tsMnodeConnCache = taosCacheInit(TSDB_DATA_TYPE_INT, CONN_CHECK_TIME, false, mnodeFreeConn, "conn");
return 0; return 0;
} }
......
...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() { ...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInitWithCb(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show"); tsMnodeShowCache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, false, mnodeFreeShowObj, "show");
return 0; return 0;
} }
......
...@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) { ...@@ -58,7 +58,7 @@ static void httpDestroyContext(void *data) {
} }
bool httpInitContexts() { bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc"); tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, false, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) { if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache"); httpError("failed to init context cache");
return false; return false;
......
...@@ -115,7 +115,7 @@ void httpCleanUpSessions() { ...@@ -115,7 +115,7 @@ void httpCleanUpSessions() {
} }
bool httpInitSessions() { bool httpInitSessions() {
tsHttpServer.sessionCache = taosCacheInitWithCb(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests"); tsHttpServer.sessionCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests");
if (tsHttpServer.sessionCache == NULL) { if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache"); httpError("failed to init session cache");
return false; return false;
......
...@@ -111,7 +111,7 @@ void mqttStopSystem() { ...@@ -111,7 +111,7 @@ void mqttStopSystem() {
} }
void mqttCleanUpSystem() { void mqttCleanUpSystem() {
mqttInfo("starting to clean up mqtt"); mqttInfo("starting to cleanup mqtt");
free(recntStatus.user_name); free(recntStatus.user_name);
free(recntStatus.password); free(recntStatus.password);
free(recntStatus.hostname); free(recntStatus.hostname);
......
...@@ -13,8 +13,10 @@ ...@@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "tcache.h"
#include "tglobal.h"
#include "qfill.h" #include "qfill.h"
#include "taosmsg.h"
#include "hash.h" #include "hash.h"
#include "qExecutor.h" #include "qExecutor.h"
...@@ -27,6 +29,7 @@ ...@@ -27,6 +29,7 @@
#include "exception.h" #include "exception.h"
#include "tscompression.h" #include "tscompression.h"
#include "ttime.h" #include "ttime.h"
#include "tfile.h"
/** /**
* check if the primary column is load by default, otherwise, the program will * check if the primary column is load by default, otherwise, the program will
...@@ -87,16 +90,18 @@ typedef struct { ...@@ -87,16 +90,18 @@ typedef struct {
STSCursor cur; STSCursor cur;
} SQueryStatusInfo; } SQueryStatusInfo;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) { static UNUSED_FUNC void *u_malloc (size_t __size) {
// uint32_t v = rand(); uint32_t v = rand();
// if (v % 5 <= 1) { if (v % 5 <= 1) {
// return NULL; return NULL;
// } else { } else {
return malloc(__size); return malloc(__size);
// } }
} }
#define malloc u_malloc #define malloc u_malloc
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
...@@ -1520,7 +1525,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1520,7 +1525,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
} }
static bool isQueryKilled(SQInfo *pQInfo) { static bool isQueryKilled(SQInfo *pQInfo) {
return false;
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED); return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
} }
...@@ -5910,9 +5914,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { ...@@ -5910,9 +5914,16 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct SQueryMgmt {
SCacheObj *qinfoPool; // query handle pool
int32_t vgId;
bool closed;
pthread_mutex_t lock;
} SQueryMgmt;
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn, int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn,
qinfo_t* pQInfo) { qinfo_t* pQInfo) {
assert(pQueryMsg != NULL); assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -6356,3 +6367,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6356,3 +6367,112 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
} }
void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillQuery(*handle);
}
void* qOpenQueryMgmt(int32_t vgId) {
const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, refresh handle pool
char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId);
SQueryMgmt* pQueryHandle = calloc(1, sizeof(SQueryMgmt));
pQueryHandle->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryHandle->closed = false;
pthread_mutex_init(&pQueryHandle->lock, NULL);
qDebug("vgId:%d, open querymgmt success", vgId);
return pQueryHandle;
}
void qSetQueryMgmtClosed(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt* pQueryMgmt = pQMgmt;
qDebug("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheEmpty(pQueryMgmt->qinfoPool, true);
}
void qCleanupQueryMgmt(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
SQueryMgmt* pQueryMgmt = pQMgmt;
int32_t vgId = pQueryMgmt->vgId;
assert(pQueryMgmt->closed);
SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
pQueryMgmt->qinfoPool = NULL;
taosCacheCleanup(pqinfoPool);
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
qDebug("vgId:%d querymgmt cleanup completed", vgId);
}
void** qRegisterQInfo(void* pMgmt, void* qInfo) {
if (pMgmt == NULL) {
return NULL;
}
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock);
return NULL;
} else {
void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2);
pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireQInfo(void* pMgmt, void** key) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES);
if (handle == NULL || *handle == NULL) {
return NULL;
} else {
return handle;
}
}
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, needFree);
return 0;
}
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "tsdbMain.h" #include "tsdbMain.h"
#include "tutil.h" #include "tutil.h"
#include "ttime.h" #include "ttime.h"
#include "tfile.h"
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"}; const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"};
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tcoding.h" #include "tcoding.h"
#include "tscompression.h" #include "tscompression.h"
#include "tsdbMain.h" #include "tsdbMain.h"
#include "tfile.h"
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM)) #define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
......
...@@ -65,7 +65,7 @@ typedef struct { ...@@ -65,7 +65,7 @@ typedef struct {
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime; int64_t refreshTime;
STrashElem * pTrash; STrashElem * pTrash;
const char * cacheName; char* name;
// void * tmrCtrl; // void * tmrCtrl;
// void * pTimer; // void * pTimer;
SCacheStatis statistics; SCacheStatis statistics;
...@@ -163,8 +163,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove); ...@@ -163,8 +163,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove);
/** /**
* move all data node into trash, clear node in trash can if it is not referenced by any clients * move all data node into trash, clear node in trash can if it is not referenced by any clients
* @param handle * @param handle
* @param _remove remove the data or not if refcount is greater than 0
*/ */
void taosCacheEmpty(SCacheObj *pCacheObj); void taosCacheEmpty(SCacheObj *pCacheObj, bool _remove);
/** /**
* release all allocated memory and destroy the cache object. * release all allocated memory and destroy the cache object.
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TFILE_H
#define TDENGINE_TFILE_H
#ifdef TAOS_RANDOM_FILE_FAIL
ssize_t taos_tread(int fd, void *buf, size_t count);
ssize_t taos_twrite(int fd, void *buf, size_t count);
off_t taos_lseek(int fd, off_t offset, int whence);
#define tread(fd, buf, count) taos_tread(fd, buf, count)
#define twrite(fd, buf, count) taos_twrite(fd, buf, count)
#define lseek(fd, offset, whence) taos_lseek(fd, offset, whence)
#endif // TAOS_RANDOM_FILE_FAIL
#endif // TDENGINE_TFILE_H
...@@ -119,9 +119,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo ...@@ -119,9 +119,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = pNode->size; int32_t size = pNode->size;
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
uDebug("key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes, cacheName:%s", uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size, pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size);
pCacheObj->cacheName);
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
free(pNode); free(pNode);
} }
...@@ -226,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); ...@@ -226,7 +225,7 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
*/ */
static void* taosCacheRefresh(void *handle); static void* taosCacheRefresh(void *handle);
SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) { SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
if (refreshTimeInSeconds <= 0) { if (refreshTimeInSeconds <= 0) {
return NULL; return NULL;
} }
...@@ -238,7 +237,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo ...@@ -238,7 +237,7 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
} }
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false); pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false);
pCacheObj->cacheName = cacheName; pCacheObj->name = strdup(cacheName);
if (pCacheObj->pHashTable == NULL) { if (pCacheObj->pHashTable == NULL) {
free(pCacheObj); free(pCacheObj);
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
...@@ -268,10 +267,6 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo ...@@ -268,10 +267,6 @@ SCacheObj *taosCacheInitWithCb(int32_t keyType, int64_t refreshTimeInSeconds, bo
return pCacheObj; return pCacheObj;
} }
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_freeres_fn_t fn, const char* cacheName) {
return taosCacheInitWithCb(keyType, refreshTimeInSeconds, extendLifespan, fn, cacheName);
}
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) { void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int duration) {
SCacheDataNode *pNode; SCacheDataNode *pNode;
...@@ -288,16 +283,16 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -288,16 +283,16 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
if (NULL != pNode) { if (NULL != pNode) {
pCacheObj->totalSize += pNode->size; pCacheObj->totalSize += pNode->size;
uDebug("key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64 uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
"bytes size:%" PRId64 "bytes, cacheName:%s", "bytes size:%" PRId64 "bytes",
key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), pCacheObj->name, key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime),
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize, pCacheObj->cacheName); (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize);
} else { } else {
uError("key:%p, failed to added into cache, out of memory, cacheName:%s", key, pCacheObj->cacheName); uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
} }
} else { // old data exists, update the node } else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
uDebug("key:%p, %p exist in cache, updated, cacheName:%s", key, pNode->data, pCacheObj->cacheName); uDebug("cache:%s, key:%p, %p exist in cache, updated", pCacheObj->name, key, pNode->data);
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
...@@ -332,10 +327,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -332,10 +327,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
if (ptNode != NULL) { if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%p, %p is retrieved from cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, ref, pCacheObj->cacheName); uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, (*ptNode)->data, ref);
} else { } else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
} }
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
...@@ -360,11 +355,11 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke ...@@ -360,11 +355,11 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke
if (ptNode != NULL) { if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("key:%p, %p expireTime is updated in cache, refcnt:%d, cacheName:%s", key, (*ptNode)->data, uDebug("cache:%s, key:%p, %p expireTime is updated in cache, refcnt:%d", pCacheObj->name, key,
T_REF_VAL_GET(*ptNode), pCacheObj->cacheName); (*ptNode)->data, T_REF_VAL_GET(*ptNode));
} else { } else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("key:%p, not in cache, retrieved failed, cacheName:%s", key, pCacheObj->cacheName); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
} }
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
...@@ -383,7 +378,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { ...@@ -383,7 +378,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
} }
int32_t ref = T_REF_INC(ptNode); int32_t ref = T_REF_INC(ptNode);
uDebug("%p acquired by data in cache, refcnt:%d, cacheName:%s", ptNode->data, ref, pCacheObj->cacheName); uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
if (pCacheObj->extendLifespan) { if (pCacheObj->extendLifespan) {
...@@ -391,7 +386,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { ...@@ -391,7 +386,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) { if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) {
ptNode->extendFactor += 1; ptNode->extendFactor += 1;
uDebug("%p extend life time to %" PRId64, ptNode->data, uDebug("cache:%s, %p extend life time to %" PRId64, pCacheObj->name, ptNode->data,
ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime); ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
} }
} }
...@@ -437,7 +432,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -437,7 +432,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
*data = NULL; *data = NULL;
int16_t ref = T_REF_DEC(pNode); int16_t ref = T_REF_DEC(pNode);
uDebug("key:%p, %p is released, refcnt:%d, cacheName:%s", pNode->key, pNode->data, ref, pCacheObj->cacheName); uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);
if (_remove && (!pNode->inTrashCan)) { if (_remove && (!pNode->inTrashCan)) {
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
...@@ -455,7 +450,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -455,7 +450,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} }
} }
void taosCacheEmpty(SCacheObj *pCacheObj) { void taosCacheEmpty(SCacheObj *pCacheObj, bool _remove) {
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);
...@@ -465,12 +460,16 @@ void taosCacheEmpty(SCacheObj *pCacheObj) { ...@@ -465,12 +460,16 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
} }
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
taosCacheMoveToTrash(pCacheObj, pNode); if (T_REF_VAL_GET(pNode) == 0 || _remove) {
taosCacheReleaseNode(pCacheObj, pNode);
} else {
taosCacheMoveToTrash(pCacheObj, pNode);
}
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
taosTrashCanEmpty(pCacheObj, false); taosTrashCanEmpty(pCacheObj, _remove);
} }
void taosCacheCleanup(SCacheObj *pCacheObj) { void taosCacheCleanup(SCacheObj *pCacheObj) {
...@@ -481,7 +480,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { ...@@ -481,7 +480,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
pCacheObj->deleting = 1; pCacheObj->deleting = 1;
pthread_join(pCacheObj->refreshWorker, NULL); pthread_join(pCacheObj->refreshWorker, NULL);
uInfo("cacheName:%s, will be cleanuped", pCacheObj->cacheName); uInfo("cache:%s will be cleaned up", pCacheObj->name);
doCleanupDataCache(pCacheObj); doCleanupDataCache(pCacheObj);
} }
...@@ -601,22 +600,25 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { ...@@ -601,22 +600,25 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
while (taosHashIterNext(pIter)) { while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
if (T_REF_VAL_GET(pNode) <= 0) { int32_t c = T_REF_VAL_GET(pNode);
if (c <= 0) {
taosCacheReleaseNode(pCacheObj, pNode); taosCacheReleaseNode(pCacheObj, pNode);
} else { } else {
uDebug("key:%p, %p will not remove from cache, refcnt:%d, cacheName:%s", pNode->key, pNode->data, uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
T_REF_VAL_GET(pNode), pCacheObj->cacheName); pNode->data, T_REF_VAL_GET(pNode));
} }
} }
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
taosHashCleanup(pCacheObj->pHashTable); // todo memory leak if there are object with refcount greater than 0 in hash table?
taosHashCleanup(pCacheObj->pHashTable);
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
taosTrashCanEmpty(pCacheObj, true); taosTrashCanEmpty(pCacheObj, true);
__cache_lock_destroy(pCacheObj); __cache_lock_destroy(pCacheObj);
tfree(pCacheObj->name);
memset(pCacheObj, 0, sizeof(SCacheObj)); memset(pCacheObj, 0, sizeof(SCacheObj));
free(pCacheObj); free(pCacheObj);
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <error.h>
#include <errno.h>
#include <stdarg.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "os.h"
#define RANDOM_FACTOR 5
ssize_t taos_tread(int fd, void *buf, size_t count)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if (rand() % RANDOM_FACTOR == 0) {
errno = EIO;
return -1;
}
#endif
return tread(fd, buf, count);
}
ssize_t taos_twrite(int fd, void *buf, size_t count)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if (rand() % RANDOM_FACTOR == 0) {
errno = EIO;
return -1;
}
#endif
return twrite(fd, buf, count);
}
off_t taos_lseek(int fd, off_t offset, int whence)
{
#ifdef TAOS_RANDOM_FILE_FAIL
if (rand() % RANDOM_FACTOR == 0) {
errno = EIO;
return -1;
}
#endif
return lseek(fd, offset, whence);
}
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "tcoding.h" #include "tcoding.h"
#include "tkvstore.h" #include "tkvstore.h"
#include "tulog.h" #include "tulog.h"
#include "tfile.h"
#define TD_KVSTORE_HEADER_SIZE 512 #define TD_KVSTORE_HEADER_SIZE 512
#define TD_KVSTORE_MAJOR_VERSION 1 #define TD_KVSTORE_MAJOR_VERSION 1
...@@ -581,4 +582,4 @@ _err: ...@@ -581,4 +582,4 @@ _err:
taosHashDestroyIter(pIter); taosHashDestroyIter(pIter);
tfree(buf); tfree(buf);
return -1; return -1;
} }
\ No newline at end of file
...@@ -53,7 +53,7 @@ typedef struct { ...@@ -53,7 +53,7 @@ typedef struct {
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SWalCfg walCfg; SWalCfg walCfg;
void *qHandlePool; // query handle pool void *qMgmt;
char *rootDir; char *rootDir;
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
} SVnodeObj; } SVnodeObj;
......
...@@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin ...@@ -46,7 +46,6 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
static void vnodeFreeqHandle(void* phandle);
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
...@@ -69,6 +68,12 @@ static void vnodeInit() { ...@@ -69,6 +68,12 @@ static void vnodeInit() {
} }
} }
void vnodeCleanupResources() {
taosHashCleanup(tsDnodeVnodesHash);
vnodeModuleInit = PTHREAD_ONCE_INIT;
tsDnodeVnodesHash = NULL;
}
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code; int32_t code;
pthread_once(&vnodeModuleInit, vnodeInit); pthread_once(&vnodeModuleInit, vnodeInit);
...@@ -283,9 +288,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -283,9 +288,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq); cqStart(pVnode->cq);
const int32_t REFRESH_HANDLE_INTERVAL = 2; // every 2 seconds, rfresh handle pool pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
pVnode->qHandlePool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, vnodeFreeqHandle, "qhandle");
pVnode->events = NULL; pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
...@@ -296,7 +299,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -296,7 +299,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
int32_t vnodeStartStream(int32_t vnode) { int32_t vnodeStartStream(int32_t vnode) {
SVnodeObj* pVnode = vnodeAccquireVnode(vnode); SVnodeObj* pVnode = vnodeAcquireVnode(vnode);
if (pVnode != NULL) { if (pVnode != NULL) {
tsdbStartStream(pVnode->tsdb); tsdbStartStream(pVnode->tsdb);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -328,6 +331,9 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -328,6 +331,9 @@ void vnodeRelease(void *pVnodeRaw) {
return; return;
} }
qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL;
if (pVnode->tsdb) if (pVnode->tsdb)
tsdbCloseRepo(pVnode->tsdb, 1); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
...@@ -362,12 +368,6 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -362,12 +368,6 @@ void vnodeRelease(void *pVnodeRaw) {
int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1); int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
if (count <= 0) {
taosHashCleanup(tsDnodeVnodesHash);
vnodeModuleInit = PTHREAD_ONCE_INIT;
tsDnodeVnodesHash = NULL;
}
} }
void *vnodeGetVnode(int32_t vgId) { void *vnodeGetVnode(int32_t vgId) {
...@@ -383,7 +383,7 @@ void *vnodeGetVnode(int32_t vgId) { ...@@ -383,7 +383,7 @@ void *vnodeGetVnode(int32_t vgId) {
return *ppVnode; return *ppVnode;
} }
void *vnodeAccquireVnode(int32_t vgId) { void *vnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId); SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode; if (pVnode == NULL) return pVnode;
...@@ -393,12 +393,21 @@ void *vnodeAccquireVnode(int32_t vgId) { ...@@ -393,12 +393,21 @@ void *vnodeAccquireVnode(int32_t vgId) {
return pVnode; return pVnode;
} }
void *vnodeAcquireRqueue(void *param) {
SVnodeObj *pVnode = param;
if (pVnode == NULL) return NULL;
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
return ((SVnodeObj *)pVnode)->rqueue;
}
void *vnodeGetRqueue(void *pVnode) { void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue; return ((SVnodeObj *)pVnode)->rqueue;
} }
void *vnodeGetWqueue(int32_t vgId) { void *vnodeGetWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAccquireVnode(vgId); SVnodeObj *pVnode = vnodeAcquireVnode(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
return pVnode->wqueue; return pVnode->wqueue;
} }
...@@ -424,6 +433,28 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) { ...@@ -424,6 +433,28 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
pLoad->replica = pVnode->syncCfg.replica; pLoad->replica = pVnode->syncCfg.replica;
} }
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
if (tsDnodeVnodesHash == NULL) return TSDB_CODE_SUCCESS;
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash);
while (taosHashIterNext(pIter)) {
SVnodeObj **pVnode = taosHashIterGet(pIter);
if (pVnode == NULL) continue;
if (*pVnode == NULL) continue;
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId;
}
}
taosHashDestroyIter(pIter);
return TSDB_CODE_SUCCESS;
}
void vnodeBuildStatusMsg(void *param) { void vnodeBuildStatusMsg(void *param) {
SDMStatusMsg *pStatus = param; SDMStatusMsg *pStatus = param;
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash); SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash);
...@@ -442,7 +473,7 @@ void vnodeBuildStatusMsg(void *param) { ...@@ -442,7 +473,7 @@ void vnodeBuildStatusMsg(void *param) {
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId); pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAccquireVnode(pAccess[i].vgId); SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState; pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
...@@ -466,7 +497,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -466,7 +497,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
// release local resources only after cutting off outside connections // release local resources only after cutting off outside connections
taosCacheCleanup(pVnode->qHandlePool); qSetQueryMgmtClosed(pVnode->qMgmt);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
...@@ -872,12 +903,3 @@ PARSE_OVER: ...@@ -872,12 +903,3 @@ PARSE_OVER:
if(fp) fclose(fp); if(fp) fclose(fp);
return terrno; return terrno;
} }
void vnodeFreeqHandle(void *qHandle) {
void** handle = qHandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillQuery(*handle);
}
\ No newline at end of file
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include <dnode.h>
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -73,18 +74,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); void* handle = NULL;
if ((void**) killQueryMsg->qhandle != NULL) {
handle = *(void**) killQueryMsg->qhandle;
}
vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
// this message arrived here by means of the *query* message, so release the vnode is necessary void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle);
void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle));
if (qhandle == NULL || *qhandle == NULL) { if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else { } else {
taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true); assert(qhandle == (void**) killQueryMsg->qhandle);
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
} }
vnodeRelease(pVnode);
return TSDB_CODE_TSC_QUERY_CANCELLED; return TSDB_CODE_TSC_QUERY_CANCELLED;
} }
...@@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -93,7 +98,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void** handle = NULL; void** handle = NULL;
if (contLen != 0) { if (contLen != 0) {
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
...@@ -105,25 +110,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -105,25 +110,30 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken // current connect is broken
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) { // add lock here
vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
pReadMsg->rpcMsg.handle); if (handle == NULL) { // failed to register qhandle
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
// NOTE: there two refcount, needs to kill twice, todo refactor
// query has not been put into qhandle pool, kill it directly.
qKillQuery(pQInfo); qKillQuery(pQInfo);
qKillQuery(pQInfo); qKillQuery(pQInfo);
} else {
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle));
}
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// NOTE: there two refcount, needs to kill twice
// query has not been put into qhandle pool, kill it directly.
qKillQuery(pQInfo);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return pRsp->code; return pRsp->code;
} }
handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2);
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle));
} else { } else {
assert(pQInfo == NULL); assert(pQInfo == NULL);
vnodeRelease(pVnode);
} }
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
...@@ -138,9 +148,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -138,9 +148,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pQInfo != NULL) { if (pQInfo != NULL) {
qTableQuery(pQInfo); // do execute query qTableQuery(pQInfo); // do execute query
assert(handle != NULL); assert(handle != NULL);
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
return code; return code;
...@@ -159,7 +168,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -159,7 +168,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t ret = 0; int32_t ret = 0;
void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo)); void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo);
if (handle == NULL || handle != pQInfo) { if (handle == NULL || handle != pQInfo) {
ret = TSDB_CODE_QRY_INVALID_QHANDLE; ret = TSDB_CODE_QRY_INVALID_QHANDLE;
} }
...@@ -167,8 +176,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -167,8 +176,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp); pRet->len = sizeof(SRetrieveTableRsp);
...@@ -178,30 +187,30 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -178,30 +187,30 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRsp->completed = true; pRsp->completed = true;
pRsp->useconds = 0; pRsp->useconds = 0;
} else { // todo handle error } else { // todo handle error
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
} }
return ret; return ret;
} }
vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo);
int32_t code = qRetrieveQueryResultInfo(*pQInfo); int32_t code = qRetrieveQueryResultInfo(*pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) {
//TODO //TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { } else {
// todo check code and handle error in build result set // todo check code and handle error in build result set
code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(*pQInfo)) { if (qHasMoreResultsToRetrieve(*handle)) {
dnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = handle; pRet->qhandle = handle;
code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; code = TSDB_CODE_SUCCESS;
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
} }
} }
return code; return code;
} }
......
...@@ -365,3 +365,7 @@ cd ../../../debug; make ...@@ -365,3 +365,7 @@ cd ../../../debug; make
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim ./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim ./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim
...@@ -133,3 +133,7 @@ cd ../../../debug; make ...@@ -133,3 +133,7 @@ cd ../../../debug; make
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim ./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim ./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeDir.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmMnodeVnodeDir_stopAll_starAll.sim
./test.sh -f unique/migrate/mn2_vn2_repl2_rmVnodeDir.sim
...@@ -23,6 +23,12 @@ system sh/cfg.sh -n dnode3 -c balanceInterval -v 10 ...@@ -23,6 +23,12 @@ system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
system sh/cfg.sh -n dnode4 -c balanceInterval -v 10 system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode5 -c balanceInterval -v 10 system sh/cfg.sh -n dnode5 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 16
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 16
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 16
system sh/cfg.sh -n dnode4 -c maxVgroupsPerDb -v 16
system sh/cfg.sh -n dnode5 -c maxVgroupsPerDb -v 16
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4 system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
...@@ -215,6 +221,7 @@ system sh/cfg.sh -n dnode3 -c alternativeRole -v 2 ...@@ -215,6 +221,7 @@ system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode3 -c offlineThreshold -v 10 system sh/cfg.sh -n dnode3 -c offlineThreshold -v 10
system sh/cfg.sh -n dnode3 -c enableCoreFile -v 1 system sh/cfg.sh -n dnode3 -c enableCoreFile -v 1
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 16
system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode3 -s start
sql create dnode $hostname3 sql create dnode $hostname3
......
...@@ -12,6 +12,12 @@ system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3 ...@@ -12,6 +12,12 @@ system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 10 system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 10
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 10 system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 10
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 10 system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 10
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 2
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 2
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 2
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 1000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 1000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 1000
print ========= start dnodes print ========= start dnodes
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
...@@ -23,7 +29,7 @@ sql create dnode $hostname3 ...@@ -23,7 +29,7 @@ sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode3 -s start
print ======== step1 print ======== step1
sql create database db replica 3 blocks 2 maxtables 1000 sql create database db replica 3 blocks 2
sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int) sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int)
$tbPrefix = db.t $tbPrefix = db.t
......
...@@ -24,10 +24,15 @@ system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4 ...@@ -24,10 +24,15 @@ system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4 system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4 system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4 system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 8
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4 system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 8
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4 system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 8
system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4 system sh/cfg.sh -n dnode4 -c maxVgroupsPerDb -v 8
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 4
system sh/cfg.sh -n dnode4 -c maxTablesPerVnode -v 4
print ========= start dnodes print ========= start dnodes
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
......
# Test case describe: dnode1/dnode2 include mnode and vnode roles
# step 1: start dnode1/dnode2, and added into cluster
# step 2: create db(repl = 2), table, insert data,
# step 4: stop dnode1, remove its mnode dir, and copy mnode dir of dnode2 to dnode1
# step 5: restart dnode1, waiting sync end
# step 6: stop dnode2, reset query cache, and query
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
#system sh/deploy.sh -n dnode3 -i 3
#system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2
#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode2 -c walLevel -v 2
#system sh/cfg.sh -n dnode3 -c walLevel -v 2
#system sh/cfg.sh -n dnode4 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c alternativeRole -v 0
system sh/cfg.sh -n dnode2 -c alternativeRole -v 0
#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
print ============== step1: start dnode1/dnode2 and add into cluster
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 1000
sql connect
sleep 1000
sql create dnode $hostname2
sleep 1000
print ============== step2: create database with replica 2, and create table, insert data
$totalTableNum = 10
$sleepTimer = 3000
$db = db
sql create database $db replica 2 cache 1
sql use $db
# create table , insert data
$stb = stb
sql create table $stb (ts timestamp, c1 double) tags(t1 int)
$rowNum = 1200
$tblNum = $totalTableNum
$totalRows = 0
$tsStart = 1577808000000 # 2020-01-01 00:00:00.000
$i = 0
while $i < $tblNum
$tb = tb . $i
sql create table $tb using $stb tags( $i )
$x = 0
while $x < $rowNum
$ts = $tsStart + $x
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x )
$x = $x + 60
endw
$totalRows = $totalRows + $x
print info: inserted $x rows into $tb and totalRows: $totalRows
$i = $i + 1
endw
sql select count(*) from $stb
print rows:$rows data00:$data00 totalRows:$totalRows
if $rows != 1 then
return -1
endi
if $data00 != $totalRows then
return -1
endi
print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc
sql insert into $tb values ( now - 20d , -20 )
sql insert into $tb values ( now - 40d , -40 )
$totalRows = $totalRows + 2
print ============== step4: stop dnode1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
$loopCnt = 0
wait_dnode1_offline:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show dnodes
if $rows != 2 then
sleep 2000
goto wait_dnode1_offline
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != offline then
sleep 2000
goto wait_dnode1_offline
endi
if $dnode2Status != ready then
sleep 2000
goto wait_dnode1_offline
endi
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
#sql show vgroups
#print show vgroups:
#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2
system_content rm -rf ../../../sim/dnode1/data/mnode
system_content cp -rf ../../../sim/dnode2/data/mnode ../../../sim/dnode1/data/
print ============== step6: restart dnode1, waiting sync end
system sh/exec.sh -n dnode1 -s start
sleep 1000
$loopCnt = 0
wait_dnode1_ready:
$loopCnt = $loopCnt + 1
if $loopCnt == 20 then
return -1
endi
sql show dnodes -x wait_dnode1_ready
if $rows != 2 then
sleep 2000
goto wait_dnode1_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
if $dnode2Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
$loopCnt = 0
wait_dnode1_vgroup_slave:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show vgroups
if $rows != 3 then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
print show vgroups:
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4
$d2v2status = $data4_4
$d2v3status = $data4_2
$d2v4status = $data4_3
$d1v2status = $data7_4
$d1v3status = $data7_2
$d1v4status = $data7_3
if $d2v2status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v3status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v4status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v2status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v3status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v4status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
print ============== step7: stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGINT
$loopCnt = 0
wait_dnode2_offline:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show dnodes
if $rows != 2 then
sleep 2000
goto wait_dnode2_offline
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != ready then
sleep 2000
goto wait_dnode2_offline
endi
if $dnode2Status != offline then
sleep 2000
goto wait_dnode2_offline
endi
sql reset query cache
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
\ No newline at end of file
# Test case describe: dnode1/dnode2 include mnode and vnode roles
# step 1: start dnode1/dnode2, and added into cluster
# step 2: create db(repl = 2), table, insert data,
# step 4: stop dnode1, remove its mnode and vnode dir, and copy mnode and vnode dir of dnode2 to dnode1
# step 5: restart dnode1, waiting sync end
# step 6: stop dnode2, reset query cache, and query
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
#system sh/deploy.sh -n dnode3 -i 3
#system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2
#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode2 -c walLevel -v 2
#system sh/cfg.sh -n dnode3 -c walLevel -v 2
#system sh/cfg.sh -n dnode4 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c alternativeRole -v 0
system sh/cfg.sh -n dnode2 -c alternativeRole -v 0
#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
print ============== step1: start dnode1/dnode2 and add into cluster
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 1000
sql connect
sleep 1000
sql create dnode $hostname2
sleep 1000
print ============== step2: create database with replica 2, and create table, insert data
$totalTableNum = 10
$sleepTimer = 3000
$db = db
sql create database $db replica 2 cache 1
sql use $db
# create table , insert data
$stb = stb
sql create table $stb (ts timestamp, c1 double) tags(t1 int)
$rowNum = 1200
$tblNum = $totalTableNum
$totalRows = 0
$tsStart = 1577808000000 # 2020-01-01 00:00:00.000
$i = 0
while $i < $tblNum
$tb = tb . $i
sql create table $tb using $stb tags( $i )
$x = 0
while $x < $rowNum
$ts = $tsStart + $x
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x )
$x = $x + 60
endw
$totalRows = $totalRows + $x
print info: inserted $x rows into $tb and totalRows: $totalRows
$i = $i + 1
endw
sql select count(*) from $stb
print rows:$rows data00:$data00 totalRows:$totalRows
if $rows != 1 then
return -1
endi
if $data00 != $totalRows then
return -1
endi
print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc
sql insert into $tb values ( now - 20d , -20 )
sql insert into $tb values ( now - 40d , -40 )
$totalRows = $totalRows + 2
print ============== step4: stop dnode1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
$loopCnt = 0
wait_dnode1_offline:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show dnodes
if $rows != 2 then
sleep 2000
goto wait_dnode1_offline
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != offline then
sleep 2000
goto wait_dnode1_offline
endi
if $dnode2Status != ready then
sleep 2000
goto wait_dnode1_offline
endi
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
#sql show vgroups
#print show vgroups:
#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2
system_content rm -rf ../../../sim/dnode1/data/vnode
system_content rm -rf ../../../sim/dnode1/data/mnode
system_content cp -rf ../../../sim/dnode2/data/vnode ../../../sim/dnode1/data/
system_content cp -rf ../../../sim/dnode2/data/mnode ../../../sim/dnode1/data/
print ============== step6: restart dnode1, waiting sync end
system sh/exec.sh -n dnode1 -s start
sleep 1000
$loopCnt = 0
wait_dnode1_ready:
$loopCnt = $loopCnt + 1
if $loopCnt == 20 then
return -1
endi
sql show dnodes -x wait_dnode1_ready
if $rows != 2 then
sleep 2000
goto wait_dnode1_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
if $dnode2Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
$loopCnt = 0
wait_dnode1_vgroup_slave:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show vgroups
if $rows != 3 then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
print show vgroups:
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4
$d2v2status = $data4_4
$d2v3status = $data4_2
$d2v4status = $data4_3
$d1v2status = $data7_4
$d1v3status = $data7_2
$d1v4status = $data7_3
if $d2v2status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v3status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v4status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v2status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v3status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v4status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
print ============== step7: stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGINT
$loopCnt = 0
wait_dnode2_offline:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show dnodes
if $rows != 2 then
sleep 2000
goto wait_dnode2_offline
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != ready then
sleep 2000
goto wait_dnode2_offline
endi
if $dnode2Status != offline then
sleep 2000
goto wait_dnode2_offline
endi
sql reset query cache
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
\ No newline at end of file
# Test case describe: dnode1/dnode2 include mnode and vnode roles
# step 1: start dnode1/dnode2, and added into cluster
# step 2: create db(repl = 2), table, insert data,
# step 4: stop dnode1, remove its mnode and vnode dir, and copy mnode and vnode dir of dnode2 to dnode1
# step 5: restart dnode1, waiting sync end
# step 6: stop dnode2, reset query cache, and query
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
#system sh/deploy.sh -n dnode3 -i 3
#system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2
#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode2 -c walLevel -v 2
#system sh/cfg.sh -n dnode3 -c walLevel -v 2
#system sh/cfg.sh -n dnode4 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c alternativeRole -v 0
system sh/cfg.sh -n dnode2 -c alternativeRole -v 0
#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
print ============== step1: start dnode1/dnode2 and add into cluster
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 1000
sql connect
sleep 1000
sql create dnode $hostname2
sleep 1000
print ============== step2: create database with replica 2, and create table, insert data
$totalTableNum = 10
$sleepTimer = 3000
$db = db
sql create database $db replica 2 cache 1
sql use $db
# create table , insert data
$stb = stb
sql create table $stb (ts timestamp, c1 double) tags(t1 int)
$rowNum = 1200
$tblNum = $totalTableNum
$totalRows = 0
$tsStart = 1577808000000 # 2020-01-01 00:00:00.000
$i = 0
while $i < $tblNum
$tb = tb . $i
sql create table $tb using $stb tags( $i )
$x = 0
while $x < $rowNum
$ts = $tsStart + $x
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x )
$x = $x + 60
endw
$totalRows = $totalRows + $x
print info: inserted $x rows into $tb and totalRows: $totalRows
$i = $i + 1
endw
sql select count(*) from $stb
print rows:$rows data00:$data00 totalRows:$totalRows
if $rows != 1 then
return -1
endi
if $data00 != $totalRows then
return -1
endi
print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc
sql insert into $tb values ( now - 20d , -20 )
sql insert into $tb values ( now - 40d , -40 )
$totalRows = $totalRows + 2
print ============== step4: stop dnode1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2
system_content rm -rf ../../../sim/dnode1/data/vnode
system_content rm -rf ../../../sim/dnode1/data/mnode
system_content cp -rf ../../../sim/dnode2/data/vnode ../../../sim/dnode1/data/
system_content cp -rf ../../../sim/dnode2/data/mnode ../../../sim/dnode1/data/
print ============== step6: restart dnode1/dnode2
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 1000
sql connect
sql use $db
$loopCnt = 0
wait_dnode1_ready:
$loopCnt = $loopCnt + 1
if $loopCnt == 20 then
return -1
endi
sql show dnodes -x wait_dnode1_ready
if $rows != 2 then
sleep 2000
goto wait_dnode1_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
if $dnode2Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
$loopCnt = 0
wait_dnode1_vgroup_slave:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show vgroups
if $rows != 3 then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
print show vgroups:
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4
$d2v2status = $data4_4
$d2v3status = $data4_2
$d2v4status = $data4_3
$d1v2status = $data7_4
$d1v3status = $data7_2
$d1v4status = $data7_3
if $d2v2status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v3status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v4status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v2status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v3status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v4status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
sql reset query cache
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
\ No newline at end of file
# Test case describe: dnode1/dnode2 include mnode and vnode roles
# step 1: start dnode1/dnode2, and added into cluster
# step 2: create db(repl = 2), table, insert data,
# step 4: stop dnode1, remove its vnode dir, and copy vnode dir of dnode2 to dnode1
# step 5: restart dnode1, waiting sync end
# step 6: stop dnode2, reset query cache, and query
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
#system sh/deploy.sh -n dnode3 -i 3
#system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2
#system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
#system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode2 -c walLevel -v 2
#system sh/cfg.sh -n dnode3 -c walLevel -v 2
#system sh/cfg.sh -n dnode4 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c balanceInterval -v 10
system sh/cfg.sh -n dnode2 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode3 -c balanceInterval -v 10
#system sh/cfg.sh -n dnode4 -c balanceInterval -v 10
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
#system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c alternativeRole -v 0
system sh/cfg.sh -n dnode2 -c alternativeRole -v 0
#system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
#system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode4 -c maxtablesPerVnode -v 4
#system sh/cfg.sh -n dnode5 -c maxtablesPerVnode -v 4
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
#system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
print ============== step1: start dnode1/dnode2 and add into cluster
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 1000
sql connect
sleep 1000
sql create dnode $hostname2
sleep 1000
print ============== step2: create database with replica 2, and create table, insert data
$totalTableNum = 10
$sleepTimer = 3000
$db = db
sql create database $db replica 2 cache 1
sql use $db
# create table , insert data
$stb = stb
sql create table $stb (ts timestamp, c1 double) tags(t1 int)
$rowNum = 1200
$tblNum = $totalTableNum
$totalRows = 0
$tsStart = 1577808000000 # 2020-01-01 00:00:00.000
$i = 0
while $i < $tblNum
$tb = tb . $i
sql create table $tb using $stb tags( $i )
$x = 0
while $x < $rowNum
$ts = $tsStart + $x
sql insert into $tb values ( $ts + 0a , $x ) ( $ts + 1a , $x ) ( $ts + 2a , $x ) ( $ts + 3a , $x ) ( $ts + 4a , $x ) ( $ts + 5a , $x ) ( $ts + 6a , $x ) ( $ts + 7a , $x ) ( $ts + 8a , $x ) ( $ts + 9a , $x ) ( $ts + 10a , $x ) ( $ts + 11a , $x ) ( $ts + 12a , $x ) ( $ts + 13a , $x ) ( $ts + 14a , $x ) ( $ts + 15a , $x ) ( $ts + 16a , $x ) ( $ts + 17a , $x ) ( $ts + 18a , $x ) ( $ts + 19a , $x ) ( $ts + 20a , $x ) ( $ts + 21a , $x ) ( $ts + 22a , $x ) ( $ts + 23a , $x ) ( $ts + 24a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 25a , $x ) ( $ts + 26a , $x ) ( $ts + 27a , $x ) ( $ts + 28a , $x ) ( $ts + 29a , $x ) ( $ts + 30a , $x ) ( $ts + 31a , $x ) ( $ts + 32a , $x ) ( $ts + 33a , $x ) ( $ts + 34a , $x ) ( $ts + 35a , $x ) ( $ts + 36a , $x ) ( $ts + 37a , $x ) ( $ts + 38a , $x ) ( $ts + 39a , $x ) ( $ts + 40a , $x ) ( $ts + 41a , $x ) ( $ts + 42a , $x ) ( $ts + 43a , $x ) ( $ts + 44a , $x ) ( $ts + 45a , $x ) ( $ts + 46a , $x ) ( $ts + 47a , $x ) ( $ts + 48a , $x ) ( $ts + 49a , $x ) ( $ts + 50a , $x ) ( $ts + 51a , $x ) ( $ts + 52a , $x ) ( $ts + 53a , $x ) ( $ts + 54a , $x ) ( $ts + 55a , $x ) ( $ts + 56a , $x ) ( $ts + 57a , $x ) ( $ts + 58a , $x ) ( $ts + 59a , $x )
$x = $x + 60
endw
$totalRows = $totalRows + $x
print info: inserted $x rows into $tb and totalRows: $totalRows
$i = $i + 1
endw
sql select count(*) from $stb
print rows:$rows data00:$data00 totalRows:$totalRows
if $rows != 1 then
return -1
endi
if $data00 != $totalRows then
return -1
endi
print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc
sql insert into $tb values ( now - 20d , -20 )
sql insert into $tb values ( now - 40d , -40 )
$totalRows = $totalRows + 2
print ============== step4: stop dnode1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
$loopCnt = 0
wait_dnode1_offline:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show dnodes
if $rows != 2 then
sleep 2000
goto wait_dnode1_offline
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != offline then
sleep 2000
goto wait_dnode1_offline
endi
if $dnode2Status != ready then
sleep 2000
goto wait_dnode1_offline
endi
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
#sql show vgroups
#print show vgroups:
#print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
#print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
#print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
print ============== step5: remove the mnode dir of dnode1, then copy the monde dir of dnode2
system_content rm -rf ../../../sim/dnode1/data/vnode
system_content cp -rf ../../../sim/dnode2/data/vnode ../../../sim/dnode1/data/
print ============== step6: restart dnode1, waiting sync end
system sh/exec.sh -n dnode1 -s start
sleep 1000
$loopCnt = 0
wait_dnode1_ready:
$loopCnt = $loopCnt + 1
if $loopCnt == 20 then
return -1
endi
sql show dnodes -x wait_dnode1_ready
if $rows != 2 then
sleep 2000
goto wait_dnode1_ready
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
if $dnode2Status != ready then
sleep 2000
goto wait_dnode1_ready
endi
$loopCnt = 0
wait_dnode1_vgroup_slave:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show vgroups
if $rows != 3 then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
print show vgroups:
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2 $data5_2 $data6_2 $data7_2 $data8_2 $data9_2
print $data0_3 $data1_3 $data2_3 $data3_3 $data4_3 $data5_3 $data6_3 $data7_3 $data8_3 $data9_3
print $data0_4 $data1_4 $data2_4 $data3_4 $data4_4 $data5_4 $data6_4 $data7_4 $data8_4 $data9_4
$d2v2status = $data4_4
$d2v3status = $data4_2
$d2v4status = $data4_3
$d1v2status = $data7_4
$d1v3status = $data7_2
$d1v4status = $data7_3
if $d2v2status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v3status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d2v4status != master then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v2status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v3status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
if $d1v4status != slave then
sleep 2000
goto wait_dnode1_vgroup_slave
endi
print ============== step7: stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGINT
$loopCnt = 0
wait_dnode2_offline:
$loopCnt = $loopCnt + 1
if $loopCnt == 10 then
return -1
endi
sql show dnodes
if $rows != 2 then
sleep 2000
goto wait_dnode2_offline
endi
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1
print $data0_2 $data1_2 $data2_2 $data3_2 $data4_2
$dnode1Status = $data4_1
$dnode2Status = $data4_2
if $dnode1Status != ready then
sleep 2000
goto wait_dnode2_offline
endi
if $dnode2Status != offline then
sleep 2000
goto wait_dnode2_offline
endi
sql reset query cache
# check using select
sql select count(*) from $stb
print data00 $data00
if $data00 != $totalRows then
return -1
endi
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册