diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index f92ba36845ab92d972fff25eb1b5676dae4775c5..0dfadd740484a80276ac8716c5c1bab5470518e7 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -32,6 +32,7 @@ #include "vnode.h" static int32_t dnodeOpenVnodes(); +static void dnodeCloseVnodes(); static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); @@ -64,10 +65,6 @@ int32_t dnodeInitMgmt() { return -1; } - if ( vnodeInitModule() != TSDB_CODE_SUCCESS) { - return -1; - } - int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -88,7 +85,7 @@ void dnodeCleanupMgmt() { tsDnodeTmr = NULL; } - vnodeCleanupModule(); + dnodeCloseVnodes(); } void dnodeMgmt(SRpcMsg *pMsg) { @@ -107,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int32_t dnodeOpenVnodes() { +static int dnodeGetVnodeList(int32_t vnodeList[]) { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { return TSDB_CODE_NO_WRITE_ACCESS; @@ -122,18 +119,42 @@ static int32_t dnodeOpenVnodes() { int32_t vnode = atoi(de->d_name + 5); if (vnode == 0) continue; - char vnodeDir[TSDB_FILENAME_LEN * 3]; - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name); - int32_t code = vnodeOpen(vnode, vnodeDir); - if (code == 0) { - numOfVnodes++; - } + vnodeList[numOfVnodes] = vnode; + numOfVnodes++; } } closedir(dir); - dPrint("dnode mgmt is opened, vnodes:%d", numOfVnodes); - return TSDB_CODE_SUCCESS; + return numOfVnodes; +} + +static int32_t dnodeOpenVnodes() { + char vnodeDir[TSDB_FILENAME_LEN * 3]; + int failed = 0; + + int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); + int numOfVnodes = dnodeGetVnodeList(vnodeList); + + for (int i=0; iconnType == TAOS_CONN_SERVER) { -// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); @@ -535,8 +534,7 @@ static void rpcCloseConn(void *thandle) { if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); -// taosDeleteStrHash(pRpc->hash, hashstr); -// taosHashRemove(pRpc->hash, hashstr, size); + taosHashRemove(pRpc->hash, hashstr, size); rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; @@ -588,7 +586,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); // check if it is already allocated -// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; if (pConn) return pConn; @@ -621,7 +618,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->localPort = (pRpc->localPort + pRpc->index); } -// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", @@ -834,13 +830,15 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) { // if there are pending request, notify the app tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); @@ -1163,13 +1161,15 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); +/* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; rpcMsg.handle = pConn; rpcMsg.msgType = pConn->inType; rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; - // (*(pRpc->cfp))(&rpcMsg); + (*(pRpc->cfp))(&rpcMsg); +*/ } rpcCloseConn(pConn); } else { diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 5bb5ef55efd22c380a14c13d3602c7c2ff8dd01b..1ac57089e73b3f066ef4b6b40f2897db7bf2e56a 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -33,27 +33,22 @@ static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeBuildVloadMsg(char *pNode, void * param); -int32_t vnodeInitModule() { +static int tsOpennedVnodes; +static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; + +static void vnodeInit() { vnodeInitWriteFp(); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); if (tsDnodeVnodesHash == NULL) { dError("failed to init vnode list"); - return -1; } - - return 0; -} - -typedef void (*CleanupFp)(char *); -void vnodeCleanupModule() { - taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose); - taosCleanUpIntHash(tsDnodeVnodesHash); } int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); @@ -116,6 +111,7 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj vnodeObj = {0}; vnodeObj.vgId = vnode; @@ -147,11 +143,14 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = VN_STATUS_READY; dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); + tsOpennedVnodes++; return TSDB_CODE_SUCCESS; } -int32_t vnodeClose(void *param) { - SVnodeObj *pVnode = (SVnodeObj *)param; +int32_t vnodeClose(int32_t vgId) { + + SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) return 0; dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); pVnode->status = VN_STATUS_CLOSING; @@ -183,6 +182,12 @@ void vnodeRelease(void *pVnodeRaw) { } dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); + + tsOpennedVnodes--; + if (tsOpennedVnodes <= 0) { + taosCleanUpIntHash(tsDnodeVnodesHash); + vnodeModuleInit = PTHREAD_ONCE_INIT; + } } void *vnodeGetVnode(int32_t vgId) {