提交 eb6dc7fe 编写于 作者: S Shengliang Guan

refactor: adjust vnode queue

上级 71c1291f
......@@ -34,6 +34,7 @@ typedef struct SVnodeMgmt {
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
SWWorkerPool applyPool;
SWWorkerPool mergePool;
SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
......
......@@ -16,10 +16,8 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "sync.h"
#include "syncTools.h"
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = {
.code = code,
.pCont = pMsg->info.rsp,
......@@ -55,7 +53,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (IsReq(pMsg)) {
if (code != 0) {
if (terrno != 0) code = terrno;
dError("msg:%p failed to process since %s", pMsg, terrstr());
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
vmSendRsp(pMsg, code);
}
......@@ -107,11 +105,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
if (code != 0) {
if (terrno != 0) code = terrno;
dError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr());
if (pMsg->info.handle != NULL) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
}
vmSendRsp(pMsg, code);
}
dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
......@@ -130,8 +126,8 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
if (terrno != 0) code = terrno;
dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code);
}
......@@ -150,7 +146,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to put msg:%p into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
dError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType));
return terrno != 0 ? terrno : -1;
}
......@@ -260,7 +256,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeApplyMsg);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
......@@ -277,7 +273,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
......@@ -309,6 +305,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pWPool->max = tsNumOfVnodeWriteThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
SWWorkerPool *pAPool = &pMgmt->applyPool;
pAPool->name = "vnode-apply";
pAPool->max = tsNumOfVnodeWriteThreads;
if (tWWorkerInit(pAPool) != 0) return -1;
SWWorkerPool *pSPool = &pMgmt->syncPool;
pSPool->name = "vnode-sync";
pSPool->max = tsNumOfVnodeSyncThreads;
......@@ -345,6 +346,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->applyPool);
tWWorkerCleanup(&pMgmt->syncPool);
tQWorkerCleanup(&pMgmt->queryPool);
tQWorkerCleanup(&pMgmt->fetchPool);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册