diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4e7a8ed6bd38e0973f40d858802901f61ca448c8..40e70c247ef2d8d4a1cce6db07ac9f48271a5085 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5280,8 +5280,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { char msg[512] = {0}; - if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) { - snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); + if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 2)) { + snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index f92ba36845ab92d972fff25eb1b5676dae4775c5..8e523eaf46701981a5469acfc19135a3fc4013d4 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -32,6 +32,7 @@ #include "vnode.h" static int32_t dnodeOpenVnodes(); +static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); @@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() { return -1; } - if ( vnodeInitModule() != TSDB_CODE_SUCCESS) { - return -1; - } - int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -88,7 +85,7 @@ void dnodeCleanupMgmt() { tsDnodeTmr = NULL; } - vnodeCleanupModule(); + dnodeCloseVnodes(); } void dnodeMgmt(SRpcMsg *pMsg) { @@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int32_t dnodeOpenVnodes() { +static int dnodeGetVnodeList(int32_t vnodeList[]) { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { return TSDB_CODE_NO_WRITE_ACCESS; @@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() { int32_t vnode = atoi(de->d_name + 5); if (vnode == 0) continue; - char vnodeDir[TSDB_FILENAME_LEN * 3]; - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); - int32_t code = vnodeOpen(vnode, vnodeDir); - if (code == 0) { - numOfVnodes++; - } + vnodeList[numOfVnodes] = vnode; + numOfVnodes++; } } closedir(dir); - dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes); - return TSDB_CODE_SUCCESS; + return numOfVnodes; +} + +static int32_t dnodeOpenVnodes() { + char vnodeDir[TSDB_FILENAME_LEN * 3]; + int failed = 0; + + int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); + int numOfVnodes = dnodeGetVnodeList(vnodeList); + + for (int i=0; icfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.commitLog = pCreate->cfg.commitLog; return vnodeCreate(pCreate); } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 0ca5b971804032978227327d625c17444469be3c..b459c2c5625c0424f4df0094eb2a3bc8e97a6e7e 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -25,13 +25,10 @@ typedef struct { void *rsp; } SRspRet; -int32_t vnodeInitModule(); -void vnodeCleanupModule(); - int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); -int32_t vnodeOpen(int32_t vnode, char *rootDir); -int32_t vnodeClose(void *pVnode); +int32_t vnodeOpen(int32_t vgId, char *rootDir); +int32_t vnodeClose(int32_t vgId); void vnodeRelease(void *pVnode); void* vnodeGetVnode(int32_t vgId); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 6a53b0d00183a18bf64cf89a88cd93896e29808f..7d13451f7ef3164becc2c2887a40039bff998ec4 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -166,8 +166,8 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { } static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) { - if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { - mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); + if (pCreate->commitLog < 0 || pCreate->commitLog > 2) { + mError("invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog); return TSDB_CODE_INVALID_OPTION; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index b16b82cb4a009fb56b42864ef101f18dd47e0cb5..87f3872b0aaacba15c49a5e3678d94993518ad8a 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -490,6 +490,7 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); pCfg->daysToKeep = htonl(pCfg->daysToKeep); pCfg->commitTime = htonl(pCfg->commitTime); + pCfg->commitLog = pCfg->commitLog; pCfg->blocksPerTable = htons(pCfg->blocksPerTable); pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index ef988dd0fa0beed8ab871a2da3f8276544cd5d8d..97bfe488ae98b3970f86cb80ea6913011ca61b82 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -1567,8 +1567,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { destroyResultBuf(pRuntimeEnv->pResultBuf); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); - tfree(pRuntimeEnv->pQuery); - pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -2247,11 +2245,8 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - // tSidSetDestroy(&pQInfo->pSidSet); - if (pQInfo->pTableDataInfo != NULL) { // size_t num = taosHashGetSize(pQInfo->pTableIdList); for (int32_t j = 0; j < 0; ++j) { @@ -5974,6 +5969,8 @@ static void freeQInfo(SQInfo *pQInfo) { } tfree(pQuery->pGroupbyExpr); + tfree(pQuery); + taosArrayDestroy(pQInfo->pTableIdList); dTrace("QInfo:%p QInfo is freed", pQInfo); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 64f265ce6ec5b7035fe601b4e20e00f078f9452b..6f86c2dd7cf78d5cf3ff54b6daf9c360cb6d5137 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -828,13 +828,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) { // if there are pending request, notify the app tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); @@ -1157,13 +1159,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); } else { diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 0287285f4db64b611ba29cfdfc72506e4bbbf08e..8a0d66068e92e5823788f8104885205e19de6708 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -632,7 +632,7 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, - 0, 1, 0, TSDB_CFG_UTYPE_NONE); + 0, 2, 0, TSDB_CFG_UTYPE_NONE); tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0, 2, 0, TSDB_CFG_UTYPE_NONE); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 5bb5ef55efd22c380a14c13d3602c7c2ff8dd01b..1be9bbb64be2807c3c44d3ca83d789b82f563bb7 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); -int32_t vnodeInitModule() { +static int tsOpennedVnodes; +static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; + +static void vnodeInit() { vnodeInitWriteFp(); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { dError("failed to init vnode list"); - return -1; } - - return 0; -} - -typedef void (*CleanupFp)(char *); -void vnodeCleanupModule() { - taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); - taosCleanUpIntHash(tsDnodeVnodesHash); } int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); @@ -93,7 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return code; } - dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId); + dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); return code; @@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj vnodeObj = {0}; vnodeObj.vgId = vnode; @@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); + tsOpennedVnodes++; return TSDB_CODE_SUCCESS; } -int32_t vnodeClose(void *param) { - SVnodeObj *pVnode = (SVnodeObj *)param; +int32_t vnodeClose(int32_t vgId) { + + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) return 0; dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); pVnode->status = VN_STATUS_CLOSING; @@ -183,6 +182,13 @@ void vnodeRelease(void *pVnodeRaw) { } dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); + + tsOpennedVnodes--; + if (tsOpennedVnodes <= 0) { + taosCleanUpIntHash(tsDnodeVnodesHash); + vnodeModuleInit = PTHREAD_ONCE_INIT; + tsDnodeVnodesHash = NULL; + } } void *vnodeGetVnode(int32_t vgId) { @@ -235,10 +241,7 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { } static void vnodeCleanUp(SVnodeObj *pVnode) { - if (pVnode->status == VN_STATUS_DELETING) { - // fix deadlock occured while close system - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - } + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); //syncStop(pVnode->sync); tsdbCloseRepo(pVnode->tsdb);