diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 49e8892c7da8e2a95b1531d3050a87c6e9453c2a..822f850018c313c0c9f2cb1358ec082b98ff25ba 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -655,8 +655,8 @@ typedef struct { } SVnodeLoad; typedef struct { - int32_t vnodeNum; - SVnodeLoad vnodeLoads[]; + int32_t num; + SVnodeLoad data[]; } SVnodeLoads; typedef struct SStatusMsg { diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index a20a7fd410bf29cf9fa12d02592c5642850cc335..ca0706f0b952d46da09f9d806cdc800309115a98 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -47,12 +47,18 @@ typedef struct { SVnodeDesc replicas[TSDB_MAX_REPLICA]; } SVnodeCfg; +typedef enum { + VN_MSG_TYPE_WRITE = 1, + VN_MSG_TYPE_APPLY, + VN_MSG_TYPE_SYNC, + VN_MSG_TYPE_QUERY, + VN_MSG_TYPE_FETCH +} EVMType; + typedef struct SVnodeMsg { - int32_t msgType; - int32_t code; - SRpcMsg rpcMsg; // original message from rpc - int32_t contLen; - char pCont[]; + int32_t curNum; + int32_t allocNum; + SRpcMsg rpcMsg[]; } SVnodeMsg; int32_t vnodeInit(); @@ -67,7 +73,11 @@ int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); + +SVnodeMsg *vnodeInitMsg(int32_t msgNum); +int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg); +void vnodeCleanupMsg(SVnodeMsg *pMsg); +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType); #ifdef __cplusplus } diff --git a/include/util/tqueue.h b/include/util/tqueue.h index faac1afe705c95218cceb17117d93fd01c06e0e7..24c56ea6a3bda993eb34f4b9b2e1a087f94b32dc 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -40,8 +40,8 @@ shall be used to set up the protection. typedef void *taos_queue; typedef void *taos_qset; typedef void *taos_qall; -typedef void *(*FProcessItem)(void *pItem, void *ahandle); -typedef void *(*FProcessItems)(taos_qall qall, int numOfItems, void *ahandle); +typedef void (*FProcessItem)(void *ahandle, void *pItem); +typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems); taos_queue taosOpenQueue(); void taosCloseQueue(taos_queue); diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/inc/dnodeVnodes.h index 2b72ba5d597a593888a9ab05a26972fe8978b8b4..31eae049abd392dc4e1105732fa1a2120769a07b 100644 --- a/source/dnode/mgmt/inc/dnodeVnodes.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -23,7 +23,7 @@ extern "C" { int32_t dnodeInitVnodes(); void dnodeCleanupVnodes(); -void dnodeGetVnodes(SVnodeLoads *pVloads); +void dnodeGetVnodeLoads(SVnodeLoads *pVloads); void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 5bf5b1d56a0af9e3c5462f1bd76e959ef662dfa2..63de2b940d87d027da4f84e5eab58544443ac698 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -372,8 +372,8 @@ static void dnodeSendStatusMsg() { char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - dnodeGetVnodes(&pStatus->vnodeLoads); - contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.vnodeNum * sizeof(SVnodeLoad); + dnodeGetVnodeLoads(&pStatus->vnodeLoads); + contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; dnodeSendMsgToMnode(&rpcMsg); diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index c2e143da807e97fd9cb2fb5903d9132c80d45199..72048a15a5893007766fc521fd20a21d6b43140b 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -45,27 +45,66 @@ typedef struct { } SVThread; static struct { - SHashObj *hash; - SWorkerPool mgmtPool; - taos_queue pMgmtQ; - SSteps *pSteps; - int32_t openVnodes; - int32_t totalVnodes; - char file[PATH_MAX + 20]; + SHashObj *hash; + SWorkerPool mgmtPool; + SWorkerPool queryPool; + SWorkerPool fetchPool; + SMWorkerPool syncPool; + SMWorkerPool writePool; + taos_queue pMgmtQ; + SSteps *pSteps; + int32_t openVnodes; + int32_t totalVnodes; + char file[PATH_MAX + 20]; } tsVnodes; +static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode); +static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode); +static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode); + static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) { SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); + if (pVnode == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + pVnode->vgId = vgId; pVnode->refCount = 0; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; - pVnode->pWriteQ = NULL; - pVnode->pSyncQ = NULL; - pVnode->pApplyQ = NULL; - pVnode->pQueryQ = NULL; - pVnode->pFetchQ = NULL; + + int32_t code = dnodeAllocVnodeQueryQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeFetchQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeWriteQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeApplyQueue(pVnode); + if (code != 0) { + return code; + } + + code = dnodeAllocVnodeSyncQueue(pVnode); + if (code != 0) { + return code; + } return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); } @@ -74,11 +113,11 @@ static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) { taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t)); //todo wait all queue empty - pVnode->pWriteQ = NULL; - pVnode->pSyncQ = NULL; - pVnode->pApplyQ = NULL; - pVnode->pQueryQ = NULL; - pVnode->pFetchQ = NULL; + dnodeFreeVnodeQueryQueue(pVnode); + dnodeFreeVnodeFetchQueue(pVnode); + dnodeFreeVnodeWriteQueue(pVnode); + dnodeFreeVnodeApplyQueue(pVnode); + dnodeFreeVnodeSyncQueue(pVnode); } static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) { @@ -465,7 +504,7 @@ static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) { return code; } -static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { +static void dnodeProcessVnodeMgmtQueue(void *unused, SRpcMsg *pMsg) { int32_t code = 0; switch (pMsg->msgType) { @@ -498,7 +537,44 @@ static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) { taosFreeQitem(pMsg); } -static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { +static void dnodeProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY); +} + +static void dnodeProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) { + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH); +} + +static void dnodeProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs); + SRpcMsg *pRpcMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + vnodeAppendMsg(pMsg, pRpcMsg); + taosFreeQitem(pRpcMsg); + } + + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE); +} + +static void dnodeProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pMsg); + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY); + } +} + +static void dnodeProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { + SVnodeMsg *pMsg = NULL; + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pMsg); + vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC); + } +} + +static int32_t dnodeWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { int32_t code = 0; if (pQueue == NULL) { @@ -520,6 +596,28 @@ static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { } } +static int32_t dnodeWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { + int32_t code = 0; + + if (pQueue == NULL) { + code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + } else { + SVnodeMsg *pMsg = vnodeInitMsg(1); + if (pMsg == NULL) { + code = TSDB_CODE_DND_OUT_OF_MEMORY; + } else { + vnodeAppendMsg(pMsg, pRpcMsg); + code = taosWriteQitem(pQueue, pMsg); + } + } + + if (code != TSDB_CODE_SUCCESS) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + rpcFreeCont(pRpcMsg->pCont); + } +} + static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { SMsgHead *pHead = (SMsgHead *)pMsg->pCont; pHead->vgId = htonl(pHead->vgId); @@ -534,12 +632,12 @@ static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) { return pVnode; } -void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } +void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteRpcMsgToVnodeQueue(tsVnodes.pMgmtQ, pMsg); } void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pWriteQ, pMsg); + dnodeWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -547,7 +645,7 @@ void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pSyncQ, pMsg); + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -555,7 +653,7 @@ void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pQueryQ, pMsg); + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -563,7 +661,7 @@ void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg); if (pVnode != NULL) { - dnodeWriteToVnodeQueue(pVnode->pFetchQ, pMsg); + dnodeWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg); dnodeReleaseVnode(pVnode); } } @@ -577,7 +675,7 @@ static int32_t dnodeInitVnodeMgmtWorker() { return TSDB_CODE_VND_OUT_OF_MEMORY; } - tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtReq); + tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtQueue); if (tsVnodes.pMgmtQ == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; } @@ -591,12 +689,137 @@ static void dnodeCleanupVnodeMgmtWorker() { tsVnodes.pMgmtQ = NULL; } +static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode) { + pVnode->pQueryQ = tWorkerAllocQueue(&tsVnodes.queryPool, pVnode, (FProcessItem)dnodeProcessVnodeQueryQueue); + if (pVnode->pQueryQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode) { + tWorkerFreeQueue(&tsVnodes.queryPool, pVnode->pQueryQ); + pVnode->pQueryQ = NULL; +} + +static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode) { + pVnode->pFetchQ = tWorkerAllocQueue(&tsVnodes.fetchPool, pVnode, (FProcessItem)dnodeProcessVnodeFetchQueue); + if (pVnode->pFetchQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode) { + tWorkerFreeQueue(&tsVnodes.fetchPool, pVnode->pFetchQ); + pVnode->pFetchQ = NULL; +} + +static int32_t dnodeInitVnodeReadWorker() { + int32_t maxFetchThreads = 4; + float threadsForQuery = MAX(tsNumOfCores * tsRatioOfQueryCores, 1); + + SWorkerPool *pPool = &tsVnodes.queryPool; + pPool->name = "vnode-query"; + pPool->min = (int32_t)threadsForQuery; + pPool->max = pPool->min; + if (tWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pPool = &tsVnodes.fetchPool; + pPool->name = "vnode-fetch"; + pPool->min = MIN(maxFetchThreads, tsNumOfCores); + pPool->max = pPool->min; + if (tWorkerInit(pPool) != 0) { + TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeReadWorker() { + tWorkerCleanup(&tsVnodes.fetchPool); + tWorkerCleanup(&tsVnodes.queryPool); +} + +static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode) { + pVnode->pWriteQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeWriteQueue); + if (pVnode->pWriteQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pWriteQ); + pVnode->pWriteQ = NULL; +} + +static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode) { + pVnode->pApplyQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeApplyQueue); + if (pVnode->pApplyQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pApplyQ); + pVnode->pApplyQ = NULL; +} + +static int32_t dnodeInitVnodeWriteWorker() { + SMWorkerPool *pPool = &tsVnodes.writePool; + pPool->name = "vnode-write"; + pPool->max = tsNumOfCores; + if (tMWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeWriteWorker() { tMWorkerCleanup(&tsVnodes.writePool); } + +static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode) { + pVnode->pSyncQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeSyncQueue); + if (pVnode->pSyncQ == NULL) { + return TSDB_CODE_DND_OUT_OF_MEMORY; + } + return 0; +} + +static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode) { + tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pSyncQ); + pVnode->pSyncQ = NULL; +} + +static int32_t dnodeInitVnodeSyncWorker() { + int32_t maxThreads = tsNumOfCores / 2; + if (maxThreads < 1) maxThreads = 1; + + SMWorkerPool *pPool = &tsVnodes.writePool; + pPool->name = "vnode-sync"; + pPool->max = maxThreads; + if (tMWorkerInit(pPool) != 0) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + return 0; +} + +static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); } + int32_t dnodeInitVnodes() { dInfo("dnode-vnodes start to init"); SSteps *pSteps = taosStepInit(3, dnodeReportStartup); taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup); taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker); + taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker); + taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker); + taosStepAdd(pSteps, "dnode-vnode-sync", dnodeInitVnodeSyncWorker, dnodeCleanupVnodeSyncWorker); taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes); tsVnodes.pSteps = pSteps; @@ -612,24 +835,26 @@ void dnodeCleanupVnodes() { } } -void dnodeGetVnodes(SVnodeLoads *pLoads) { - pLoads->vnodeNum = taosHashGetSize(tsVnodes.hash); +void dnodeGetVnodeLoads(SVnodeLoads *pLoads) { + pLoads->num = taosHashGetSize(tsVnodes.hash); int32_t v = 0; void *pIter = taosHashIterate(tsVnodes.hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL) continue; + SVnodeObj *pVnode = *ppVnode; - if (pVnode) { - SVnodeLoad *pLoad = &pLoads->vnodeLoads[v++]; - vnodeGetLoad(pVnode->pImpl, pLoad); - pLoad->vgId = htonl(pLoad->vgId); - pLoad->totalStorage = htobe64(pLoad->totalStorage); - pLoad->compStorage = htobe64(pLoad->compStorage); - pLoad->pointsWritten = htobe64(pLoad->pointsWritten); - pLoad->tablesNum = htobe64(pLoad->tablesNum); - } + if (pVnode == NULL) continue; + + SVnodeLoad *pLoad = &pLoads->data[v++]; + vnodeGetLoad(pVnode->pImpl, pLoad); + pLoad->vgId = htonl(pLoad->vgId); + pLoad->totalStorage = htobe64(pLoad->totalStorage); + pLoad->compStorage = htobe64(pLoad->compStorage); + pLoad->pointsWritten = htobe64(pLoad->pointsWritten); + pLoad->tablesNum = htobe64(pLoad->tablesNum); + pIter = taosHashIterate(tsVnodes.hash, pIter); } -} \ No newline at end of file +} diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 1df3d3774969f71b7d984c5d74dddb8b497178f4..c345f2e1b9b3b7b448046dc2a14b84dec3a6a7ee 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "vnodeInt.h" +#include "tqueue.h" int32_t vnodeInit() { return 0; } void vnodeCleanup() {} @@ -27,5 +28,45 @@ int32_t vnodeDrop(SVnode *pVnode) { return 0; } int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } -void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {} void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {} + +SVnodeMsg *vnodeInitMsg(int32_t msgNum) { + SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); + if (pMsg == NULL) { + terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + return NULL; + } else { + pMsg->allocNum = msgNum; + return pMsg; + } +} + +int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) { + if (pMsg->curNum >= pMsg->allocNum) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg; +} + +void vnodeCleanupMsg(SVnodeMsg *pMsg) { + for (int32_t i = 0; i < pMsg->curNum; ++i) { + rpcFreeCont(pMsg->rpcMsg[i].pCont); + } + taosFreeQitem(pMsg); +} + +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVMType msgType) { + switch (msgType) { + case VN_MSG_TYPE_WRITE: + break; + case VN_MSG_TYPE_APPLY: + break; + case VN_MSG_TYPE_SYNC: + break; + case VN_MSG_TYPE_QUERY: + break; + case VN_MSG_TYPE_FETCH: + break; + } +} diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 7df12089b7d3c9b571f0c3b0b12461b6fb6e3190..136bc40482670510a99dc2008e8f04b585ee94f7 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) { } if (fp) { - (*fp)(msg, ahandle); + (*fp)(ahandle, msg); } } @@ -186,7 +186,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { } if (fp) { - (*fp)(worker->qall, numOfMsgs, ahandle); + (*fp)(ahandle, worker->qall, numOfMsgs); } }