提交 1373e3cf 编写于 作者: S Shengliang Guan

refact vnode queue

上级 33a60b60
...@@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) { ...@@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) {
} }
static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt; if (pBnode == NULL) return;
int32_t refCount = 0;
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pBnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
if (pBnode != NULL) {
dTrace("release bnode, refCount:%d", refCount); dTrace("release bnode, refCount:%d", refCount);
}
} }
static int32_t dndReadBnodeFile(SDnode *pDnode) { static int32_t dndReadBnodeFile(SDnode *pDnode) {
......
...@@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { ...@@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) {
} }
static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; if (pMnode == NULL) return;
int32_t refCount = 0;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pMnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
if (pMnode != NULL) {
dTrace("release mnode, refCount:%d", refCount); dTrace("release mnode, refCount:%d", refCount);
}
} }
static int32_t dndReadMnodeFile(SDnode *pDnode) { static int32_t dndReadMnodeFile(SDnode *pDnode) {
......
...@@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) { ...@@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) {
} }
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) { static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt; if (pQnode == NULL) return;
int32_t refCount = 0;
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pQnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
if (pQnode != NULL) {
dTrace("release qnode, refCount:%d", refCount); dTrace("release qnode, refCount:%d", refCount);
}
} }
static int32_t dndReadQnodeFile(SDnode *pDnode) { static int32_t dndReadQnodeFile(SDnode *pDnode) {
......
...@@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) { ...@@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) {
} }
static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt; if (pSnode == NULL) return;
int32_t refCount = 0;
SSnodeMgmt *pMgmt = &pDnode->smgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pSnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
if (pSnode != NULL) {
dTrace("release snode, refCount:%d", refCount); dTrace("release snode, refCount:%d", refCount);
}
} }
static int32_t dndReadSnodeFile(SDnode *pDnode) { static int32_t dndReadSnodeFile(SDnode *pDnode) {
......
...@@ -40,7 +40,7 @@ typedef struct { ...@@ -40,7 +40,7 @@ typedef struct {
STaosQueue *pSyncQ; STaosQueue *pSyncQ;
STaosQueue *pApplyQ; STaosQueue *pApplyQ;
STaosQueue *pQueryQ; STaosQueue *pQueryQ;
STaosQueue* pFetchQ; STaosQueue *pFetchQ;
} SVnodeObj; } SVnodeObj;
typedef struct { typedef struct {
...@@ -53,22 +53,8 @@ typedef struct { ...@@ -53,22 +53,8 @@ typedef struct {
SWrapperCfg *pCfgs; SWrapperCfg *pCfgs;
} SVnodeThread; } SVnodeThread;
static int32_t dndInitVnodeReadWorker(SDnode *pDnode); static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode); static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndInitVnodeSyncWorker(SDnode *pDnode);
static void dndCleanupVnodeReadWorker(SDnode *pDnode);
static void dndCleanupVnodeWriteWorker(SDnode *pDnode);
static void dndCleanupVnodeSyncWorker(SDnode *pDnode);
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode);
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
...@@ -117,11 +103,9 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { ...@@ -117,11 +103,9 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
if (pVnode == NULL) return; if (pVnode == NULL) return;
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
} }
...@@ -134,7 +118,7 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -134,7 +118,7 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
} }
pVnode->vgId = pCfg->vgId; pVnode->vgId = pCfg->vgId;
pVnode->refCount = 1; pVnode->refCount = 0;
pVnode->dropped = 0; pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl; pVnode->pImpl = pImpl;
...@@ -148,23 +132,8 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -148,23 +132,8 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
return -1; return -1;
} }
if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) { if (dndAllocVnodeQueue(pDnode, pVnode) != 0) {
return -1; terrno = TSDB_CODE_OUT_OF_MEMORY;
}
if (dndAllocVnodeFetchQueue(pDnode, pVnode) != 0) {
return -1;
}
if (dndAllocVnodeWriteQueue(pDnode, pVnode) != 0) {
return -1;
}
if (dndAllocVnodeApplyQueue(pDnode, pVnode) != 0) {
return -1;
}
if (dndAllocVnodeSyncQueue(pDnode, pVnode) != 0) {
return -1; return -1;
} }
...@@ -192,12 +161,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { ...@@ -192,12 +161,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dndFreeVnodeQueryQueue(pDnode, pVnode); dndFreeVnodeQueue(pDnode, pVnode);
dndFreeVnodeFetchQueue(pDnode, pVnode);
dndFreeVnodeWriteQueue(pDnode, pVnode);
dndFreeVnodeApplyQueue(pDnode, pVnode);
dndFreeVnodeSyncQueue(pDnode, pVnode);
vnodeClose(pVnode->pImpl); vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL; pVnode->pImpl = NULL;
...@@ -527,8 +491,8 @@ static void dndCloseVnodes(SDnode *pDnode) { ...@@ -527,8 +491,8 @@ static void dndCloseVnodes(SDnode *pDnode) {
dInfo("total vnodes:%d are all closed", numOfVnodes); dInfo("total vnodes:%d are all closed", numOfVnodes);
} }
static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) {
SCreateVnodeReq *pCreate = rpcMsg->pCont; SCreateVnodeReq *pCreate = pReq->pCont;
pCreate->vgId = htonl(pCreate->vgId); pCreate->vgId = htonl(pCreate->vgId);
pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dnodeId = htonl(pCreate->dnodeId);
pCreate->dbUid = htobe64(pCreate->dbUid); pCreate->dbUid = htobe64(pCreate->dbUid);
...@@ -585,14 +549,14 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra ...@@ -585,14 +549,14 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra
pCfg->vgVersion = pCreate->vgVersion; pCfg->vgVersion = pCreate->vgVersion;
} }
static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *pReq) {
SDropVnodeReq *pDrop = rpcMsg->pCont; SDropVnodeReq *pDrop = pReq->pCont;
pDrop->vgId = htonl(pDrop->vgId); pDrop->vgId = htonl(pDrop->vgId);
return pDrop; return pDrop;
} }
static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *pReq) {
SAuthVnodeReq *pAuth = rpcMsg->pCont; SAuthVnodeReq *pAuth = pReq->pCont;
pAuth->vgId = htonl(pAuth->vgId); pAuth->vgId = htonl(pAuth->vgId);
return pAuth; return pAuth;
} }
...@@ -612,7 +576,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -612,7 +576,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("vgId:%d, already exist, return success", pCreate->vgId); dDebug("vgId:%d, already exist, return success", pCreate->vgId);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
return -1; return 0;
} }
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
...@@ -648,16 +612,13 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -648,16 +612,13 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SVnodeCfg vnodeCfg = {0}; SVnodeCfg vnodeCfg = {0};
dndGenerateVnodeCfg(pAlter, &vnodeCfg); dndGenerateVnodeCfg(pAlter, &vnodeCfg);
SWrapperCfg wrapperCfg = {0};
dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
return -1; return -1;
} }
if (wrapperCfg.vgVersion == pVnode->vgVersion) { if (pAlter->vgVersion == pVnode->vgVersion) {
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId); dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
return 0; return 0;
...@@ -670,7 +631,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -670,7 +631,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
int32_t oldVersion = pVnode->vgVersion; int32_t oldVersion = pVnode->vgVersion;
pVnode->vgVersion = wrapperCfg.vgVersion; pVnode->vgVersion = pAlter->vgVersion;
int32_t code = dndWriteVnodesToFile(pDnode); int32_t code = dndWriteVnodesToFile(pDnode);
if (code != 0) { if (code != 0) {
pVnode->vgVersion = oldVersion; pVnode->vgVersion = oldVersion;
...@@ -689,7 +650,7 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -689,7 +650,7 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dDebug("vgId:%d, failed to drop since %s", vgId, terrstr()); dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
return -1; return 0;
} }
pVnode->dropped = 1; pVnode->dropped = 1;
...@@ -717,7 +678,7 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -717,7 +678,7 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dDebug("vgId:%d, failed to auth since %s", vgId, terrstr()); dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
return terrno; return -1;
} }
pVnode->accessState = pAuth->accessState; pVnode->accessState = pAuth->accessState;
...@@ -821,6 +782,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_ ...@@ -821,6 +782,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// todo
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
(void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
} }
...@@ -832,6 +794,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t ...@@ -832,6 +794,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
// todo
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
(void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
} }
...@@ -855,21 +818,25 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) ...@@ -855,21 +818,25 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg)
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
} }
} }
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
SMsgHead *pHead = (SMsgHead *)pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
...@@ -910,142 +877,69 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -910,142 +877,69 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) { if (pVnode == NULL) return -1;
return -1;
}
int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
return code; return code;
} }
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
if (pVnode->pQueryQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
pVnode->pQueryQ = NULL;
}
static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
if (pVnode->pFetchQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pFetchQ = NULL;
}
static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
int32_t maxFetchThreads = 4; int32_t maxFetchThreads = 4;
float threadsForQuery = MAX(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores, 1); int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores);
int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1);
int32_t maxQueryThreads = minQueryThreads;
int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1);
int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1);
SWorkerPool *pPool = &pMgmt->queryPool; SWorkerPool *pPool = &pMgmt->queryPool;
pPool->name = "vnode-query"; pPool->name = "vnode-query";
pPool->min = (int32_t)threadsForQuery; pPool->min = minQueryThreads;
pPool->max = pPool->min; pPool->max = maxQueryThreads;
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) return -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pPool = &pMgmt->fetchPool; pPool = &pMgmt->fetchPool;
pPool->name = "vnode-fetch"; pPool->name = "vnode-fetch";
pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores); pPool->min = minFetchThreads;
pPool->max = pPool->min; pPool->max = maxFetchThreads;
if (tWorkerInit(pPool) != 0) { if (tWorkerInit(pPool) != 0) return -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; SMWorkerPool *pMPool = &pMgmt->writePool;
} pMPool->name = "vnode-write";
pMPool->max = maxWriteThreads;
if (tMWorkerInit(pMPool) != 0) return -1;
pMPool = &pMgmt->syncPool;
pMPool->name = "vnode-sync";
pMPool->max = maxSyncThreads;
if (tMWorkerInit(pMPool) != 0) return -1;
dDebug("vnode read worker is initialized"); dDebug("vnode workers is initialized");
return 0; return 0;
} }
static void dndCleanupVnodeReadWorker(SDnode *pDnode) { static void dndCleanupVnodeWorkers(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerCleanup(&pMgmt->fetchPool); tWorkerCleanup(&pMgmt->fetchPool);
tWorkerCleanup(&pMgmt->queryPool); tWorkerCleanup(&pMgmt->queryPool);
dDebug("vnode close worker is initialized"); tMWorkerCleanup(&pMgmt->writePool);
} tMWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed");
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
if (pVnode->pWriteQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
} }
static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
pVnode->pWriteQ = NULL;
}
static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue);
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue);
if (pVnode->pApplyQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
pVnode->pApplyQ = NULL;
}
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->writePool;
pPool->name = "vnode-write";
pPool->max = pDnode->opt.numOfCores;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("vnode write worker is initialized");
return 0;
}
static void dndCleanupVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerCleanup(&pMgmt->writePool);
dDebug("vnode write worker is closed");
}
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue);
if (pVnode->pSyncQ == NULL) { pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue);
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
pVnode->pQueryQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -1053,50 +947,26 @@ static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { ...@@ -1053,50 +947,26 @@ static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
return 0; return 0;
} }
static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
pVnode->pWriteQ = NULL;
pVnode->pApplyQ = NULL;
pVnode->pSyncQ = NULL; pVnode->pSyncQ = NULL;
} pVnode->pFetchQ = NULL;
pVnode->pQueryQ = NULL;
static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
int32_t maxThreads = pDnode->opt.numOfCores / 2;
if (maxThreads < 1) maxThreads = 1;
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->syncPool;
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("vnode sync worker is initialized");
return 0;
}
static void dndCleanupVnodeSyncWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode sync worker is closed");
} }
int32_t dndInitVnodes(SDnode *pDnode) { int32_t dndInitVnodes(SDnode *pDnode) {
dInfo("dnode-vnodes start to init"); dInfo("dnode-vnodes start to init");
if (dndInitVnodeReadWorker(pDnode) != 0) { if (dndInitVnodeWorkers(pDnode) != 0) {
dError("failed to init vnodes read worker since %s", terrstr()); terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; dError("failed to init vnode workers since %s", terrstr());
}
if (dndInitVnodeWriteWorker(pDnode) != 0) {
dError("failed to init vnodes write worker since %s", terrstr());
return -1;
}
if (dndInitVnodeSyncWorker(pDnode) != 0) {
dError("failed to init vnodes sync worker since %s", terrstr());
return -1; return -1;
} }
...@@ -1112,9 +982,7 @@ int32_t dndInitVnodes(SDnode *pDnode) { ...@@ -1112,9 +982,7 @@ int32_t dndInitVnodes(SDnode *pDnode) {
void dndCleanupVnodes(SDnode *pDnode) { void dndCleanupVnodes(SDnode *pDnode) {
dInfo("dnode-vnodes start to clean up"); dInfo("dnode-vnodes start to clean up");
dndCloseVnodes(pDnode); dndCloseVnodes(pDnode);
dndCleanupVnodeReadWorker(pDnode); dndCleanupVnodeWorkers(pDnode);
dndCleanupVnodeWriteWorker(pDnode);
dndCleanupVnodeSyncWorker(pDnode);
dInfo("dnode-vnodes is cleaned up"); dInfo("dnode-vnodes is cleaned up");
} }
......
...@@ -20,12 +20,13 @@ static void vnodeFree(SVnode *pVnode); ...@@ -20,12 +20,13 @@ static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode);
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfgInput) { SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
// Set default options // Set default options
SVnodeCfg *pVnodeCfg = &defaultVnodeOptions; if (pVnodeCfg == NULL) {
pVnodeCfg->vgId = pVnodeCfg->vgId; pVnodeCfg = &defaultVnodeOptions;
}
// Validate options // Validate options
if (vnodeValidateOptions(pVnodeCfg) < 0) { if (vnodeValidateOptions(pVnodeCfg) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册