diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 8c194c031d81d899af22b0be90be87072d8a8b51..6ab4eeaa8a3dd54eb61f14d75d97e34e64a2390d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -54,7 +54,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const return; } - strtolower(pSql->sqlstr, sqlstr); + strntolower(pSql->sqlstr, sqlstr, sqlLen); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); pSql->cmd.curSql = pSql->sqlstr; @@ -564,8 +564,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); if (!pSql->cmd.parseFinished) { tsParseSql(pSql, false); - sem_post(&pSql->rspSem); } + (*pSql->fp)(pSql->param, pSql, code); return; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 9d80c7ed50944f14a5687d3034d5368552861cb1..f76a1341d24beec8295011f01fe4bd5e57e3b0e8 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4642,21 +4642,24 @@ typedef struct SDNodeDynConfOption { } SDNodeDynConfOption; -int32_t validateEp(char* ep) { +int32_t validateEp(char* ep) { char buf[TSDB_EP_LEN + 1] = {0}; tstrncpy(buf, ep, TSDB_EP_LEN); - char *pos = strchr(buf, ':'); - if (NULL == pos) { - return TSDB_CODE_TSC_INVALID_SQL; + char* pos = strchr(buf, ':'); + if (NULL == pos) { + int32_t val = strtol(ep, NULL, 10); + if (val <= 0 || val > 65536) { + return TSDB_CODE_TSC_INVALID_SQL; + } + } else { + uint16_t port = atoi(pos + 1); + if (0 == port) { + return TSDB_CODE_TSC_INVALID_SQL; + } } - - uint16_t port = atoi(pos+1); - if (0 == port) { - return TSDB_CODE_TSC_INVALID_SQL; - } - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } int32_t validateDNodeConfig(tDCLSQL* pOptions) { @@ -4664,13 +4667,13 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { return TSDB_CODE_TSC_INVALID_SQL; } - const int DNODE_DYNAMIC_CFG_OPTIONS_SIZE = 17; + const int DNODE_DYNAMIC_CFG_OPTIONS_SIZE = 19; const SDNodeDynConfOption DNODE_DYNAMIC_CFG_OPTIONS[] = { {"resetLog", 8}, {"resetQueryCache", 15}, {"debugFlag", 9}, {"mDebugFlag", 10}, {"dDebugFlag", 10}, {"sdbDebugFlag", 12}, {"vDebugFlag", 10}, {"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"monitorDebugFlag", 16}, {"rpcDebugFlag", 12}, {"uDebugFlag", 10}, {"tmrDebugFlag", 12}, {"qDebugflag", 10}, {"sDebugflag", 10}, {"tsdbDebugFlag", 13}, - {"monitor", 7}}; + {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"monitor", 7}}; SSQLToken* pOptionToken = &pOptions->a[1]; @@ -4694,7 +4697,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { SSQLToken* pValToken = &pOptions->a[2]; int32_t val = strtol(pValToken->z, NULL, 10); - if (val < 131 || val > 199) { + if (val < 0 || val > 256) { /* options value is out of valid range */ return TSDB_CODE_TSC_INVALID_SQL; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5d5e5469430721079753ff3d4badd0e40e77d1d2..0677463d8d01df899a377c7390a22743aa9b22ed 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -263,12 +263,29 @@ TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { return pSql; } TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { - char* buf = malloc(sqlLen + 1); - buf[sqlLen] = 0; - strncpy(buf, sqlstr, sqlLen); - TAOS_RES *res = taos_query(taos, buf); - free(buf); - return res; + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL || pObj->signature != pObj) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return NULL; + } + + if (sqlLen > tsMaxSQLStringLen) { + tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); + terrno = TSDB_CODE_TSC_INVALID_SQL; + return NULL; + } + + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + tscError("failed to malloc sqlObj"); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); + + tsem_wait(&pSql->rspSem); + return pSql; } int taos_result_precision(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 6cc27a4cfe0991072bee92b7e831fb0736923b38..7c188ec96975f28e1bc5d692ed4e2c98ac505afb 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { SSqlObj * pSql = pStream->pSql; pSql->fp = tscProcessStreamQueryCallback; + pSql->fetchFp = tscProcessStreamQueryCallback; pSql->param = pStream; pSql->res.completed = false; @@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { } } +static void tscCreateStream(void *param, TAOS_RES *res, int code) { + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = pStream->pSql; + SSqlCmd* pCmd = &pSql->cmd; + + if (code != TSDB_CODE_SUCCESS) { + setErrorInfo(pSql, code, pCmd->payload); + tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, pSql->sqlstr, pCmd->payload, code); + pStream->fp(pStream->param, NULL, NULL); + return; + } + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + + pStream->isProject = isProjectStream(pQueryInfo); + pStream->precision = tinfo.precision; + + pStream->ctime = taosGetTimestamp(pStream->precision); + pStream->etime = pQueryInfo->window.ekey; + + tscAddIntoStreamList(pStream); + + tscSetSlidingWindowInfo(pSql, pStream); + pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); + + int64_t starttime = tscGetLaunchTimestamp(pStream); + pCmd->command = TSDB_SQL_SELECT; + taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); + + tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, + pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, pSql->sqlstr); +} + TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; @@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } pSql->signature = pSql; - pSql->param = pSql; pSql->pTscObj = pObj; SSqlCmd *pCmd = &pSql->cmd; @@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscFreeSqlObj(pSql); return NULL; } + + pStream->stime = stime; + pStream->fp = fp; + pStream->callback = callback; + pStream->param = param; + pStream->pSql = pSql; pSql->pStream = pStream; + pSql->param = pStream; pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); if (pSql->sqlstr == NULL) { @@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); + pSql->fp = tscCreateStream; + pSql->fetchFp = tscCreateStream; int32_t code = tsParseSql(pSql, true); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - sem_wait(&pSql->rspSem); - } - - if (pRes->code != TSDB_CODE_SUCCESS) { - setErrorInfo(pSql, pRes->code, pCmd->payload); - - tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code); + if (code == TSDB_CODE_SUCCESS) { + tscCreateStream(pStream, pSql, code); + } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(pRes->code)); tscFreeSqlObj(pSql); + free(pStream); return NULL; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - - pStream->isProject = isProjectStream(pQueryInfo); - pStream->fp = fp; - pStream->callback = callback; - pStream->param = param; - pStream->pSql = pSql; - pStream->precision = tinfo.precision; - - pStream->ctime = taosGetTimestamp(pStream->precision); - pStream->etime = pQueryInfo->window.ekey; - - tscAddIntoStreamList(pStream); - - tscSetSlidingWindowInfo(pSql, pStream); - pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, stime); - - int64_t starttime = tscGetLaunchTimestamp(pStream); - pCmd->command = TSDB_SQL_SELECT; - taosTmrReset(tscProcessStreamTimer, starttime, pStream, tscTmr, &pStream->pTimer); - - tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql, - pStream, pTableMetaInfo->name, pStream->interval, pStream->slidingTime, starttime, sqlstr); - return pStream; } diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 12ea4ad78d18a5f2b09daabb71093c153d30aca7..719d80aa775bf3b5abe3065ef7eca799d452e315 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { pCols->maxPoints = maxRows; pCols->bufSize = maxRowSize * maxRows; - pCols->buf = malloc(pCols->bufSize); + pCols->buf = calloc(1, pCols->bufSize); if (pCols->buf == NULL) { free(pCols); return NULL; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 04d3a6fd6dd40382f7e9bc03820e64df59a04950..51cd471a6b66526228b85be1c8dbc6f033d2515f 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -23,6 +23,7 @@ #include "taos.h" #include "taosdef.h" #include "taosmsg.h" +#include "ttimer.h" #include "tcq.h" #include "tdataformat.h" #include "tglobal.h" @@ -45,10 +46,12 @@ typedef struct { struct SCqObj *pHead; void *dbConn; int master; + void *tmrCtrl; pthread_mutex_t mutex; } SCqContext; typedef struct SCqObj { + tmr_h tmrId; uint64_t uid; int32_t tid; // table ID int rowSize; // bytes of a row @@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { - SCqContext *pContext = calloc(sizeof(SCqContext), 1); if (pContext == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } + pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); + tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user)); tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass)); const char* db = pCfg->db; @@ -99,6 +103,9 @@ void cqClose(void *handle) { SCqContext *pContext = handle; if (handle == NULL) return; + taosTmrCleanUp(pContext->tmrCtrl); + pContext->tmrCtrl = NULL; + // stop all CQs cqStop(pContext); @@ -154,8 +161,10 @@ void cqStop(void *handle) { taos_close_stream(pObj->pStream); pObj->pStream = NULL; cTrace("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + taosTmrStop(pObj->tmrId); + pObj->tmrId = 0; } - pObj = pObj->next; } @@ -211,8 +220,13 @@ void cqDrop(void *handle) { } // free the resources associated - if (pObj->pStream) taos_close_stream(pObj->pStream); - pObj->pStream = NULL; + if (pObj->pStream) { + taos_close_stream(pObj->pStream); + pObj->pStream = NULL; + } else { + taosTmrStop(pObj->tmrId); + pObj->tmrId = 0; + } cTrace("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr); tdFreeSchema(pObj->pSchema); @@ -222,18 +236,30 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } -static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { +static void cqProcessCreateTimer(void *param, void *tmrId) { + SCqObj* pObj = (SCqObj*)param; + SCqContext* pContext = pObj->pContext; + if (pContext->dbConn == NULL) { pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); if (pContext->dbConn == NULL) { cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); - return; } } + + cqCreateStream(pContext, pObj); +} - int64_t lastKey = 0; +static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL); + + if (pContext->dbConn == NULL) { + pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl); + return; + } + pObj->tmrId = 0; + + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); if (pObj->pStream) { pContext->num++; cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index 6a6da0a2a52844e4a2c42bad64658c93b48fffae..8618de324446ca3f2504d15a6422bcca3a4b51b0 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -22,7 +22,6 @@ extern "C" { int32_t dnodeInitModules(); void dnodeStartModules(); -void dnodeStartStream(); void dnodeCleanupModules(); void dnodeProcessModuleStatus(uint32_t moduleStatus); diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 987a1899597f495730a391e6bdbde7fb862731c3..6476bb78314f76ae02cbaae0dd04547bc7565313 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -123,7 +123,6 @@ int32_t dnodeInitSystem() { dnodeStartModules(); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); - dnodeStartStream(); dInfo("TDengine is initialized successfully"); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 338fcd287cefe8813e29ff8da7f40a60cf59fad5..8e1696c80207de54b57c9c497e589d2b7b6137ac 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() { return TSDB_CODE_SUCCESS; } -void dnodeStartStream() { - int32_t vnodeList[TSDB_MAX_VNODES] = {0}; - int32_t numOfVnodes = 0; - int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes); - - if (status != TSDB_CODE_SUCCESS) { - dInfo("get dnode list failed"); - return; - } - - for (int32_t i = 0; i < numOfVnodes; ++i) { - vnodeStartStream(vnodeList[i]); - } - - dInfo("streams started"); -} - static void dnodeCloseVnodes() { int32_t vnodeList[TSDB_MAX_VNODES]= {0}; int32_t numOfVnodes = 0; @@ -416,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); + void *pVnode = vnodeAcquire(pCreate->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); vnodeRelease(pVnode); @@ -430,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); - void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId); + void *pVnode = vnodeAcquire(pAlter->cfg.vgId); if (pVnode != NULL) { dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); int32_t code = vnodeAlter(pVnode, pAlter); @@ -723,6 +706,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { // fill cluster cfg parameters pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes); + pStatus->clusterCfg.enableBalance = htonl(tsEnableBalance); pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum); pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold); pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 66135a93e9e6c824ce5219a4f5ef65b230bbce95..bbea1a5e0bb52a56cfbf5a23dd29a17e094586a9 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - void *pVnode; while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - pVnode = vnodeAcquireVnode(pHead->vgId); + taos_queue queue = vnodeAcquireRqueue(pHead->vgId); - if (pVnode == NULL) { + if (queue == NULL) { leftLen -= pHead->contLen; pCont -= pHead->contLen; continue; } // put message into queue - taos_queue queue = vnodeGetRqueue(pVnode); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = *pMsg; pRead->pCont = pCont; @@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) { - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - pRead->pCont = qhandle; - pRead->contLen = 0; - - assert(pVnode != NULL); - taos_queue queue = vnodeAcquireRqueue(pVnode); - - taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead); -} - void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { .handle = pRead->rpcMsg.handle, diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index ba36e537a6c2b7a6327ef17ed0be83b880009310..dc09a03e1497cb52718279b542e3318fb2789d08 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - taos_queue queue = vnodeGetWqueue(pHead->vgId); + taos_queue queue = vnodeAcquireWqueue(pHead->vgId); if (queue) { // put message into queue SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 096aae58f2ad9cb157ba5b700581bdf52a23f6eb..b561c407a3415d7db27333d96e21a72d4f159d8b 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); void dnodeFreeVnodeWqueue(void *queue); void *dnodeAllocateVnodeRqueue(void *pVnode); void dnodeFreeVnodeRqueue(void *rqueue); -void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e30efcfd372866c3133e99847143c6a590e72bb7..b7afaf1e065074018d9725796434776dae1a9158 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -54,12 +54,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" ) @@ -564,15 +563,16 @@ typedef struct { typedef struct { int32_t numOfMnodes; // tsNumOfMnodes + int32_t enableBalance; // tsEnableBalance int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum int32_t offlineThreshold; // tsOfflineThreshold int32_t statusInterval; // tsStatusInterval + int32_t maxtablesPerVnode; + int32_t maxVgroupsPerDb; char arbitrator[TSDB_EP_LEN]; // tsArbitrator char timezone[64]; // tsTimezone char locale[TSDB_LOCALE_LEN]; // tsLocale char charset[TSDB_LOCALE_LEN]; // tsCharset - int32_t maxtablesPerVnode; - int32_t maxVgroupsPerDb; } SClusterCfg; typedef struct { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 3059fd2435e8db125305cc0db1f0d74fd5d4302c..e46ea39a59495991238cde777f7c31ea72aa87ac 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -116,7 +116,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); -void tsdbStartStream(TSDB_REPO_T *repo); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 972db294f62a35ba7910576534f4291c021b6272..65b91d87e479abd4c0b497cf4b95bc561db8fae5 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); typedef void (*FNotifyRole)(void *ahandle, int8_t role); // when data file is synced successfully, notity app -typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); +typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef struct { int32_t vgId; // vgroup ID diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 1e6cfa97006e110f5ce1e36073f165540adeda26..15ddb6afee7c3ac2914df8133df24f6ef80a0a8a 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -22,10 +22,10 @@ extern "C" { typedef enum _VN_STATUS { TAOS_VN_STATUS_INIT, - TAOS_VN_STATUS_UPDATING, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING, - TAOS_VN_STATUS_DELETING, + TAOS_VN_STATUS_UPDATING, + TAOS_VN_STATUS_RESET, } EVnStatus; typedef struct { @@ -44,17 +44,13 @@ typedef struct { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId, char *rootDir); -int32_t vnodeStartStream(int32_t vgId); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); -void vnodeRelease(void *pVnode); -void* vnodeAcquireVnode(int32_t vgId); // add refcount -void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged - -void* vnodeAcquireRqueue(void *); -void* vnodeGetRqueue(void *); -void* vnodeGetWqueue(int32_t vgId); +void* vnodeAcquire(int32_t vgId); // add refcount +void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue +void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue +void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index a65f54cd8bd186adaf67bc5c45ebe3ea933a4c75..06d79bd7e11eca1f8381777667e5d4844be8b440 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -269,18 +269,37 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) { } static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { + if (strcmp(pMsg->pUser->user, TSDB_DEFAULT_USER) != 0) { + mError("failed to cfg dnode, no rights"); + return TSDB_CODE_MND_NO_RIGHTS; + } + SCMCfgDnodeMsg *pCmCfgDnode = pMsg->rpcMsg.pCont; if (pCmCfgDnode->ep[0] == 0) { - strcpy(pCmCfgDnode->ep, tsLocalEp); - } else { - // TODO temporary disabled for compiling: strcpy(pCmCfgDnode->ep, pCmCfgDnode->ep); + tstrncpy(pCmCfgDnode->ep, tsLocalEp, TSDB_EP_LEN); + } + + int32_t dnodeId = 0; + char* pos = strchr(pCmCfgDnode->ep, ':'); + if (NULL == pos) { + dnodeId = strtol(pCmCfgDnode->ep, NULL, 10); + if (dnodeId <= 0 || dnodeId > 65536) { + mError("failed to cfg dnode, invalid dnodeId:%s", pCmCfgDnode->ep); + return TSDB_CODE_MND_DNODE_NOT_EXIST; + } } - if (strcmp(pMsg->pUser->user, TSDB_DEFAULT_USER) != 0) { - return TSDB_CODE_MND_NO_RIGHTS; + SRpcIpSet ipSet = mnodeGetIpSetFromIp(pCmCfgDnode->ep); + if (dnodeId != 0) { + SDnodeObj *pDnode = mnodeGetDnode(dnodeId); + if (pDnode == NULL) { + mError("failed to cfg dnode, invalid dnodeId:%d", dnodeId); + return TSDB_CODE_MND_DNODE_NOT_EXIST; + } + ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp); + mnodeDecDnodeRef(pDnode); } - SRpcIpSet ipSet = mnodeGetIpSetFromIp(pCmCfgDnode->ep); SMDCfgDnodeMsg *pMdCfgDnode = rpcMallocCont(sizeof(SMDCfgDnodeMsg)); strcpy(pMdCfgDnode->ep, pCmCfgDnode->ep); strcpy(pMdCfgDnode->config, pCmCfgDnode->config); @@ -292,9 +311,9 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { .pCont = pMdCfgDnode, .contLen = sizeof(SMDCfgDnodeMsg) }; - dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg); mInfo("dnode:%s, is configured by %s", pCmCfgDnode->ep, pMsg->pUser->user); + dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg); return TSDB_CODE_SUCCESS; } @@ -305,6 +324,7 @@ static void mnodeProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { static bool mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) { if (clusterCfg->numOfMnodes != htonl(tsNumOfMnodes)) return false; + if (clusterCfg->enableBalance != htonl(tsEnableBalance)) return false; if (clusterCfg->mnodeEqualVnodeNum != htonl(tsMnodeEqualVnodeNum)) return false; if (clusterCfg->offlineThreshold != htonl(tsOfflineThreshold)) return false; if (clusterCfg->statusInterval != htonl(tsStatusInterval)) return false; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 8b4d62a8b0711b7ddf44bc370f279df113a9e884..8c8aa5fb31977851b8928593639bd095f9cb9f55 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -593,7 +593,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p pShow->bytes[cols] = 4; pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "maxTables"); + strcpy(pSchema[cols].name, "onlineVnodes"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -692,8 +692,15 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v *(int32_t *)pWrite = taosIdPoolMaxSize(pVgroup->idPool); cols++; + int32_t onlineVnodes = 0; + for (int32_t i = 0; i < pShow->maxReplica; ++i) { + if (pVgroup->vnodeGid[i].role == TAOS_SYNC_ROLE_SLAVE || pVgroup->vnodeGid[i].role == TAOS_SYNC_ROLE_MASTER) { + onlineVnodes++; + } + } + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = tsMaxTablePerVnode; + *(int32_t *)pWrite = onlineVnodes; cols++; for (int32_t i = 0; i < pShow->maxReplica; ++i) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 6b3160070514b13d8b3b961854346d98d2b400a3..ffaab375a3bfc305cbedecae6df8881e28772aa2 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); +static void tsdbStartStream(STsdbRepo *pRepo); +static void tsdbStopStream(STsdbRepo *pRepo); // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { @@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) { goto _err; } + tsdbStartStream(pRepo); // pRepo->state = TSDB_REPO_STATE_ACTIVE; tsdbDebug("vgId:%d open tsdb repository succeed!", REPO_ID(pRepo)); @@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { STsdbRepo *pRepo = (STsdbRepo *)repo; int vgId = REPO_ID(pRepo); + tsdbStopStream(repo); + if (toCommit) { tsdbAsyncCommit(pRepo); if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); @@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ return magic; } -void tsdbStartStream(TSDB_REPO_T *repo) { - STsdbRepo *pRepo = (STsdbRepo *)repo; - STsdbMeta *pMeta = pRepo->tsdbMeta; - - for (int i = 0; i < pRepo->config.maxTables; i++) { - STable *pTable = pMeta->tables[i]; - if (pTable && pTable->type == TSDB_STREAM_TABLE) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, - tsdbGetTableSchemaImpl(pTable, false, false, -1)); - } - } -} - STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { ASSERT(repo != NULL); return &((STsdbRepo *)repo)->config; @@ -1120,4 +1112,27 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { return TSDB_GET_TABLE_LAST_KEY(pTable); } -#endif \ No newline at end of file +#endif + +static void tsdbStartStream(STsdbRepo *pRepo) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->type == TSDB_STREAM_TABLE) { + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, + tsdbGetTableSchemaImpl(pTable, false, false, -1)); + } + } +} + +static void tsdbStopStream(STsdbRepo *pRepo) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pMeta->tables[i]; + if (pTable && pTable->type == TSDB_STREAM_TABLE) { + (*pRepo->appH.cqDropFunc)(pTable->cqhandle); + } + } +} diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index 1ba57bbaaa670cbd9aca7b19252a3bd745187130..f7c69e3973ad6fdefd4ef33f4a813221eedebe6d 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -133,6 +133,8 @@ char **strsplit(char *src, const char *delim, int32_t *num); char* strtolower(char *dst, const char *src); +char* strntolower(char *dst, const char *src, int32_t n); + int64_t strnatoi(char *num, int32_t len); //char* strreplace(const char* str, const char* pattern, const char* rep); diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index b225dfa36a4572ff3f56649e22ffdc81879d4d3c..5de61a3d57a0cc480781ec581bdc76d54419bf87 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI return -1; } + /* set REUSEADDR option, so the portnumber can be re-used */ + int reuse = 1; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + close(sockFd); + return -1; + }; + if ( clientIp != 0) { memset((char *)&clientAddr, 0, sizeof(clientAddr)); clientAddr.sin_family = AF_INET; diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 1a74359f47af23d0073f41901f86282e3654e212..c8df34e1cd4e71954bf9255cc1507b5bb82d519b 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -234,6 +234,32 @@ char* strtolower(char *dst, const char *src) { *p = 0; return dst; } +char* strntolower(char *dst, const char *src, int32_t n) { + int esc = 0; + char quote = 0, *p = dst, c; + + assert(dst != NULL); + + for (c = *src++; n-- > 0; c = *src++) { + if (esc) { + esc = 0; + } else if (quote) { + if (c == '\\') { + esc = 1; + } else if (c == quote) { + quote = 0; + } + } else if (c >= 'A' && c <= 'Z') { + c -= 'A' - 'a'; + } else if (c == '\'' || c == '"') { + quote = c; + } + *p++ = c; + } + + *p = 0; + return dst; +} char *paGetToken(char *string, char **token, int32_t *tokenLen) { char quote = 0; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 77db4fd04c0f18569aec21bd459b4342a6ab568c..74cfbf1e731fd67c47f7375198a8353740429060 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,7 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count - int status; + int8_t status; int8_t role; int8_t accessState; int64_t version; // current version @@ -55,6 +55,8 @@ typedef struct { SWalCfg walCfg; void *qMgmt; char *rootDir; + tsem_t sem; + int8_t dropped; char db[TSDB_DB_NAME_LEN]; } SVnodeObj; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f316efcfdb2fda0426c47abb89801e35c713b82e..98882e4c3ccb4c1d964f41137b438f74c193853f 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } @@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) { SVnodeObj *pVnode = *ppVnode; vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount); - pVnode->status = TAOS_VN_STATUS_DELETING; + pVnode->dropped = 1; vnodeCleanUp(pVnode); return TSDB_CODE_SUCCESS; @@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // cfgVersion can be corrected by status msg - if (pVnode->status != TAOS_VN_STATUS_READY) { + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); return TSDB_CODE_SUCCESS; } - // the vnode may always fail to synchronize because of it in low cfgVersion - // so cannot use the following codes - // if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED) - // return TSDB_CODE_VND_NOT_SYNCED; - - pVnode->status = TAOS_VN_STATUS_UPDATING; - int32_t code = vnodeSaveCfg(pVnodeCfg); if (code != TSDB_CODE_SUCCESS) { pVnode->status = TAOS_VN_STATUS_READY; @@ -194,10 +187,12 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { return code; } - code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); - if (code != TSDB_CODE_SUCCESS) { - pVnode->status = TAOS_VN_STATUS_READY; - return code; + if (pVnode->tsdb) { + code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); + if (code != TSDB_CODE_SUCCESS) { + pVnode->status = TAOS_VN_STATUS_READY; + return code; + } } pVnode->status = TAOS_VN_STATUS_READY; @@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); pVnode->accessState = TSDB_VN_ALL_ACCCESS; + tsem_init(&pVnode->sem, 0, 0); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { @@ -302,10 +298,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } #endif - // start continuous query - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) - cqStart(pVnode->cq); - pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; @@ -316,22 +308,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { return TSDB_CODE_SUCCESS; } -int32_t vnodeStartStream(int32_t vnode) { - SVnodeObj* pVnode = vnodeAcquireVnode(vnode); - if (pVnode != NULL) { - tsdbStartStream(pVnode->tsdb); - vnodeRelease(pVnode); - } - return TSDB_CODE_SUCCESS; -} - int32_t vnodeClose(int32_t vgId) { SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); if (ppVnode == NULL || *ppVnode == NULL) return 0; SVnodeObj *pVnode = *ppVnode; vDebug("vgId:%d, vnode will be closed", pVnode->vgId); - pVnode->status = TAOS_VN_STATUS_CLOSING; vnodeCleanUp(pVnode); return 0; @@ -346,6 +328,8 @@ void vnodeRelease(void *pVnodeRaw) { if (refCount > 0) { vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) + tsem_post(&pVnode->sem); return; } @@ -356,11 +340,6 @@ void vnodeRelease(void *pVnodeRaw) { tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; - // stop continuous query - if (pVnode->cq) - cqClose(pVnode->cq); - pVnode->cq = NULL; - if (pVnode->wal) walClose(pVnode->wal); pVnode->wal = NULL; @@ -375,20 +354,21 @@ void vnodeRelease(void *pVnodeRaw) { tfree(pVnode->rootDir); - if (pVnode->status == TAOS_VN_STATUS_DELETING) { + if (pVnode->dropped) { char rootDir[TSDB_FILENAME_LEN] = {0}; sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); taosMvDir(tsVnodeBakDir, rootDir); taosRemoveDir(rootDir); } + tsem_destroy(&pVnode->sem); free(pVnode); int32_t count = taosHashGetSize(tsDnodeVnodesHash); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); } -void *vnodeGetVnode(int32_t vgId) { +void *vnodeAcquire(int32_t vgId) { SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; @@ -396,35 +376,38 @@ void *vnodeGetVnode(int32_t vgId) { return NULL; } - return *ppVnode; -} - -void *vnodeAcquireVnode(int32_t vgId) { - SVnodeObj *pVnode = vnodeGetVnode(vgId); - if (pVnode == NULL) return pVnode; - + SVnodeObj *pVnode = *ppVnode; atomic_add_fetch_32(&pVnode->refCount, 1); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); return pVnode; } -void *vnodeAcquireRqueue(void *param) { - SVnodeObj *pVnode = param; +void *vnodeAcquireRqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; - atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); - return ((SVnodeObj *)pVnode)->rqueue; -} + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } -void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; + return pVnode->rqueue; } -void *vnodeGetWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeAcquireVnode(vgId); +void *vnodeAcquireWqueue(int32_t vgId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; + + if (pVnode->status == TAOS_VN_STATUS_RESET) { + terrno = TSDB_CODE_VND_INVALID_STATUS; + vInfo("vgId:%d, status is in reset", vgId); + vnodeRelease(pVnode); + return NULL; + } + return pVnode->wqueue; } @@ -496,7 +479,7 @@ void vnodeBuildStatusMsg(void *param) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { for (int32_t i = 0; i < numOfVnodes; ++i) { pAccess[i].vgId = htonl(pAccess[i].vgId); - SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId); + SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId); if (pVnode != NULL) { pVnode->accessState = pAccess[i].accessState; if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { @@ -510,11 +493,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + int i = 0; + + if (pVnode->status != TAOS_VN_STATUS_INIT) { + // it may be in updateing or reset state, then it shall wait + while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { + if (++i % 1000 == 0) { + sched_yield(); + } + } + } // stop replication module if (pVnode->sync) { - syncStop(pVnode->sync); + void *sync = pVnode->sync; pVnode->sync = NULL; + syncStop(sync); + } + + // stop continuous query + if (pVnode->cq) { + void *cq = pVnode->cq; + pVnode->cq = NULL; + cqClose(cq); } vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); @@ -561,18 +562,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } -static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { - SVnodeObj *pVnode = ahandle; - vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); - - pVnode->fversion = fversion; - pVnode->version = fversion; - vnodeSaveVersion(pVnode); - +static int vnodeResetTsdb(SVnodeObj *pVnode) +{ char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - // clsoe tsdb, then open tsdb - tsdbCloseRepo(pVnode->tsdb, 0); + + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) + return -1; + + void *tsdb = pVnode->tsdb; + pVnode->tsdb = NULL; + + // acquire vnode + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + + if (refCount > 2) + tsem_wait(&pVnode->sem); + + // close tsdb, then open tsdb + tsdbCloseRepo(tsdb, 0); STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; @@ -580,6 +588,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { appH.cqCreateFunc = cqCreate; appH.cqDropFunc = cqDrop; pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); + + pVnode->status = TAOS_VN_STATUS_READY; + vnodeRelease(pVnode); + + return 0; +} + +static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { + SVnodeObj *pVnode = ahandle; + vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion); + + pVnode->fversion = fversion; + pVnode->version = fversion; + vnodeSaveVersion(pVnode); + + return vnodeResetTsdb(pVnode); } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f529b713cf659d923e64ba8bf49798a0dde0f195..973df7c5a10f2ba84a77718602c8ffddd4b14134 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -26,6 +26,7 @@ #include "tsdb.h" #include "vnode.h" #include "vnodeInt.h" +#include "tqueue.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); @@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return TSDB_CODE_VND_INVALID_STATUS; } + // tsdb may be in reset state + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_RPC_NOT_READY; + // TODO: Later, let slave to support query if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); @@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + pRead->pCont = qhandle; + pRead->contLen = 0; + + atomic_add_fetch_32(&pVnode->refCount, 1); + taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); +} + static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void *pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } @@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } else { // if failed to dump result, free qhandle immediately if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { if (qHasMoreResultsToRetrieve(*handle)) { - dnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, *handle); pRet->qhandle = *handle; freeHandle = false; } else { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 09e4b43ed38091247da0a51a56a41c52f2bb010e..6b9b8ca4fd5a5c42028936e48b53f45ffa811903 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { return TSDB_CODE_VND_NO_WRITE_AUTH; } + // tsdb may be in reset state + if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING) + return TSDB_CODE_RPC_NOT_READY; + if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state } - if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); return TSDB_CODE_RPC_NOT_READY; } diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 5ee33c421ea1a2da6949d057f6670353a09abd3d..9d1aef0dc5cc2cf266931ff6e8ba88794649686c 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -143,6 +143,7 @@ python3 ./test.py -f query/filterOtherTypes.py python3 ./test.py -f query/querySort.py python3 ./test.py -f query/queryJoin.py python3 ./test.py -f query/select_last_crash.py +python3 ./test.py -f query/queryNullValueTest.py #stream python3 ./test.py -f stream/metric_1.py diff --git a/tests/pytest/query/queryInsertValue.py b/tests/pytest/query/queryInsertValue.py new file mode 100644 index 0000000000000000000000000000000000000000..856801b4ee162a35e1e4e4b864860180960a1432 --- /dev/null +++ b/tests/pytest/query/queryInsertValue.py @@ -0,0 +1,65 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numOfRecords = 10 + self.ts = 1537146000000 + + def restartTaosd(self): + tdDnodes.stop(1) + tdDnodes.start(1) + tdSql.execute("use db") + + def run(self): + tdSql.prepare() + + print("==============step1") + + tdSql.execute( + "create table st (ts timestamp, speed int) tags(areaid int, loc nchar(20))") + tdSql.execute("create table t1 using st tags(1, 'beijing')") + tdSql.execute("insert into t1 values(now, 1)") + tdSql.query("select * from st") + tdSql.checkRows(1) + + tdSql.execute("alter table st add column length int") + tdSql.execute("insert into t1 values(now, 1, 2)") + tdSql.query("select last(*) from st") + tdSql.checkData(0, 2, 2); + + self.restartTaosd(); + + tdSql.query("select last(*) from st") + tdSql.checkData(0, 2, 2); + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/query/queryNullValueTest.py b/tests/pytest/query/queryNullValueTest.py new file mode 100644 index 0000000000000000000000000000000000000000..2ad1979e0bfb33246d087e85132341fdc6e0bdcd --- /dev/null +++ b/tests/pytest/query/queryNullValueTest.py @@ -0,0 +1,181 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numOfRecords = 10 + self.ts = 1537146000000 + + def checkNullValue(self, result): + mx = np.array(result) + [rows, cols] = mx.shape + for i in range(rows): + for j in range(cols): + if j + 1 < cols and mx[i, j + 1] is not None: + print(mx[i, j + 1]) + return False + return True + + def restartTaosd(self): + tdDnodes.stop(1) + tdDnodes.start(1) + tdSql.execute("use db") + + def run(self): + tdSql.prepare() + + print("==============step1") + + tdSql.execute( + "create table meters (ts timestamp, col1 int) tags(tgcol1 int)") + tdSql.execute("create table t0 using meters tags(NULL)") + + for i in range (self.numOfRecords): + tdSql.execute("insert into t0 values (%d, %d)" % (self.ts + i, i)); + + tdSql.query("select * from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col2 tinyint") + tdSql.execute("alter table meters drop column col1") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col2 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col1 int") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col1 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col3 smallint") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col3 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col4 bigint") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col4 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col5 float") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col5 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col6 double") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col6 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col7 bool") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col7 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col8 binary(20)") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col8 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add column col9 nchar(20)") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select col9 from meters") + tdSql.checkRows(10) + + tdSql.execute("alter table meters add tag tgcol2 tinyint") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol2 from meters") + tdSql.checkRows(1) + + + tdSql.execute("alter table meters add tag tgcol3 smallint") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol3 from meters") + tdSql.checkRows(1) + + + tdSql.execute("alter table meters add tag tgcol4 bigint") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol4 from meters") + tdSql.checkRows(1) + + tdSql.execute("alter table meters add tag tgcol5 float") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol5 from meters") + tdSql.checkRows(1) + + tdSql.execute("alter table meters add tag tgcol6 double") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol6 from meters") + tdSql.checkRows(1) + + tdSql.execute("alter table meters add tag tgcol7 bool") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol7 from meters") + tdSql.checkRows(1) + + tdSql.execute("alter table meters add tag tgcol8 binary(20)") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol8 from meters") + tdSql.checkRows(1) + + tdSql.execute("alter table meters add tag tgcol9 nchar(20)") + tdSql.query("select * from meters") + tdSql.checkRows(10) + tdSql.query("select tgcol9 from meters") + tdSql.checkRows(1) + + self.restartTaosd() + tdSql.query("select * from meters") + tdSql.checkRows(10) + if self.checkNullValue(tdSql.queryResult) is False: + tdLog.exit("non None value is detected") + + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/regressiontest.sh b/tests/pytest/regressiontest.sh index ccc6635ced9dd532bb62149a00608ba5a849d04f..24cd93f0fc58772e2d1741d9ee0119d9d5247357 100755 --- a/tests/pytest/regressiontest.sh +++ b/tests/pytest/regressiontest.sh @@ -140,6 +140,7 @@ python3 ./test.py -f query/queryJoin.py python3 ./test.py -f query/filterCombo.py python3 ./test.py -f query/queryNormal.py python3 ./test.py -f query/select_last_crash.py +python3 ./test.py -f query/queryNullValueTest.py #stream python3 ./test.py -f stream/stream1.py diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 445baa9e45326e895d75489262217f9d3f938606..b9a9e4f02460717e04d4ba4f6347f9bab666207b 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -136,6 +136,8 @@ echo "defaultPass taosdata" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG echo "clog 2" >> $TAOS_CFG +#echo "cache 1" >> $TAOS_CFG +#echo "block 2" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG echo "numOfTotalVnodes 4" >> $TAOS_CFG echo "maxVgroupsPerDb 4" >> $TAOS_CFG diff --git a/tests/script/sh/exec.sh b/tests/script/sh/exec.sh index 6928039be11642b82e9a50d2f32a38d442abfbce..2f294075a1695298cb52b53bc52e11d9dce9fe07 100755 --- a/tests/script/sh/exec.sh +++ b/tests/script/sh/exec.sh @@ -88,7 +88,9 @@ if [ "$EXEC_OPTON" = "start" ]; then echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR if [ "$SHELL_OPTION" = "true" ]; then - nohup valgrind --log-file=${LOG_DIR}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & + TT=`date +%s` + mkdir ${LOG_DIR}/${TT} + nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & else nohup $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & fi diff --git a/tests/script/unique/vnode/replica2_a_large.sim b/tests/script/unique/vnode/replica2_a_large.sim new file mode 100644 index 0000000000000000000000000000000000000000..801570dd9c18daa5611d621660e88905fa73c2b3 --- /dev/null +++ b/tests/script/unique/vnode/replica2_a_large.sim @@ -0,0 +1,103 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 +system sh/cfg.sh -n dnode1 -c wallevel -v 2 +system sh/cfg.sh -n dnode2 -c wallevel -v 2 +system sh/cfg.sh -n dnode3 -c wallevel -v 2 +system sh/cfg.sh -n dnode4 -c wallevel -v 2 +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode1 -c debugFlag -v 131 +system sh/cfg.sh -n dnode2 -c debugFlag -v 131 +system sh/cfg.sh -n dnode3 -c debugFlag -v 131 +system sh/cfg.sh -n dnode4 -c debugFlag -v 131 +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +system sh/exec.sh -n dnode1 -s start +sleep 3000 +sql connect + +sql create dnode $hostname2 +sql create dnode $hostname3 +system sh/exec.sh -n dnode2 -s start -t +system sh/exec.sh -n dnode3 -s start -t +sleep 3000 + +print ========= step1 +sql create database db replica 2 +#sql create table db.tb1 (ts timestamp, i int) +#sql create table db.tb2 (ts timestamp, i int) +#sql create table db.tb3 (ts timestamp, i int) +#sql create table db.tb4 (ts timestamp, i int) +#sql insert into db.tb1 values(now, 1) +#sql select count(*) from db.tb1 + +sql create database db replica 2 +sql create table db.tb (ts timestamp, i int) +sql insert into db.tb values(now, 1) +sql select count(*) from db.tb +$lastRows = $rows + +print ======== step2 +#run_back unique/vnode/back_insert_many.sim +run_back unique/vnode/back_insert.sim +sleep 3000 + +print ======== step3 + +$x = 0 +loop: + +print ======== step4 +system sh/exec.sh -n dnode2 -s stop -x SIGINT +sleep 10000 +system sh/exec.sh -n dnode2 -s start -t +sleep 10000 + +print ======== step5 +system sh/exec.sh -n dnode3 -s stop -x SIGINT +sleep 10000 +system sh/exec.sh -n dnode3 -s start -t +sleep 10000 + +print ======== step6 +#sql select count(*) from db.tb1 +#print select count(*) from db.tb1 ==> $data00 $lastRows +sql select count(*) from db.tb +print select count(*) from db.tb ==> $data00 $lastRows +if $data00 <= $lastRows then + return -1 +endi + +print ======== step7 +$lastRows = $data00 +print ======== loop Times $x + +if $x < 10 then + $x = $x + 1 + goto loop +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT +system sh/exec.sh -n dnode4 -s stop -x SIGINT +system sh/exec.sh -n dnode5 -s stop -x SIGINT +system sh/exec.sh -n dnode6 -s stop -x SIGINT +system sh/exec.sh -n dnode7 -s stop -x SIGINT +system sh/exec.sh -n dnode8 -s stop -x SIGINT \ No newline at end of file