diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index f92ba36845ab92d972fff25eb1b5676dae4775c5..0dfadd740484a80276ac8716c5c1bab5470518e7 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -32,6 +32,7 @@ #include "vnode.h" static int32_t dnodeOpenVnodes(); +static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); @@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() { return -1; } - if ( vnodeInitModule() != TSDB_CODE_SUCCESS) { - return -1; - } - int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -88,7 +85,7 @@ void dnodeCleanupMgmt() { tsDnodeTmr = NULL; } - vnodeCleanupModule(); + dnodeCloseVnodes(); } void dnodeMgmt(SRpcMsg *pMsg) { @@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int32_t dnodeOpenVnodes() { +static int dnodeGetVnodeList(int32_t vnodeList[]) { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { return TSDB_CODE_NO_WRITE_ACCESS; @@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() { int32_t vnode = atoi(de->d_name + 5); if (vnode == 0) continue; - char vnodeDir[TSDB_FILENAME_LEN * 3]; - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); - int32_t code = vnodeOpen(vnode, vnodeDir); - if (code == 0) { - numOfVnodes++; - } + vnodeList[numOfVnodes] = vnode; + numOfVnodes++; } } closedir(dir); - dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes); - return TSDB_CODE_SUCCESS; + return numOfVnodes; +} + +static int32_t dnodeOpenVnodes() { + char vnodeDir[TSDB_FILENAME_LEN * 3]; + int failed = 0; + + int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); + int numOfVnodes = dnodeGetVnodeList(vnodeList); + + for (int i=0; iinType) { // if there are pending request, notify the app tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); @@ -1157,13 +1159,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); } else { diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 5bb5ef55efd22c380a14c13d3602c7c2ff8dd01b..1ac57089e73b3f066ef4b6b40f2897db7bf2e56a 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); -int32_t vnodeInitModule() { +static int tsOpennedVnodes; +static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; + +static void vnodeInit() { vnodeInitWriteFp(); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { dError("failed to init vnode list"); - return -1; } - - return 0; -} - -typedef void (*CleanupFp)(char *); -void vnodeCleanupModule() { - taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); - taosCleanUpIntHash(tsDnodeVnodesHash); } int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); @@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj vnodeObj = {0}; vnodeObj.vgId = vnode; @@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); + tsOpennedVnodes++; return TSDB_CODE_SUCCESS; } -int32_t vnodeClose(void *param) { - SVnodeObj *pVnode = (SVnodeObj *)param; +int32_t vnodeClose(int32_t vgId) { + + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) return 0; dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); pVnode->status = VN_STATUS_CLOSING; @@ -183,6 +182,12 @@ void vnodeRelease(void *pVnodeRaw) { } dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); + + tsOpennedVnodes--; + if (tsOpennedVnodes <= 0) { + taosCleanUpIntHash(tsDnodeVnodesHash); + vnodeModuleInit = PTHREAD_ONCE_INIT; + } } void *vnodeGetVnode(int32_t vgId) {