diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index a1035200475259bd91757f934d35a0dd5a69b1fe..b3c3df80b2e201ff9f4b40ebfcceafba57de1366 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -20,9 +20,11 @@ extern "C" { #endif -int32_t dnodeInitVnodeRead(); -void dnodeCleanupVnodeRead(); -void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg); +int32_t dnodeInitVRead(); +void dnodeCleanupVRead(); +void dnodeDispatchToVReadQueue(SRpcMsg *pMsg); +void * dnodeAllocVReadQueue(void *pVnode); +void dnodeFreeVReadQueue(void *rqueue); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index b4c174e29bc15be2796c6fab60e4ba0ab9e7b621..5fde4f972be404bc5a35255cefb6e3f8038bf6fc 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -61,7 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = { {"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos}, {"wal", walInit, walCleanUp}, {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! - {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, + {"vread", dnodeInitVRead, dnodeCleanupVRead}, {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite}, {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 33cda607937bac0032ca202ff1e746aec401f618..2f6d844ff177e5afa5a92981006b7135500dcd32 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -39,8 +39,8 @@ static int32_t tsDnodeSubmitReqNum = 0; int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; // the following message shall be treated as mnode write diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index abf3cb527d3cc918161e2df10fd9f939adabfc30..f571cfda9f638a52c943cf17f53928669f243206 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -17,84 +17,79 @@ #include "os.h" #include "taoserror.h" #include "taosmsg.h" -#include "tutil.h" -#include "tqueue.h" -#include "twal.h" #include "tglobal.h" -#include "dnodeInt.h" -#include "dnodeMgmt.h" -#include "dnodeVRead.h" +#include "tqueue.h" #include "vnode.h" +#include "dnodeInt.h" typedef struct { - pthread_t thread; // thread - int32_t workerId; // worker ID -} SReadWorker; + pthread_t thread; // thread + int32_t workerId; // worker ID +} SVReadWorker; typedef struct { - int32_t max; // max number of workers - int32_t min; // min number of workers - int32_t num; // current number of workers - SReadWorker *readWorker; + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + SVReadWorker * worker; pthread_mutex_t mutex; -} SReadWorkerPool; +} SVReadWorkerPool; static void *dnodeProcessReadQueue(void *param); -static void dnodeHandleIdleReadWorker(SReadWorker *); // module global variable -static SReadWorkerPool readPool; -static taos_qset readQset; +static SVReadWorkerPool tsVReadWP; +static taos_qset tsVReadQset; -int32_t dnodeInitVnodeRead() { - readQset = taosOpenQset(); +int32_t dnodeInitVRead() { + tsVReadQset = taosOpenQset(); - readPool.min = tsNumOfCores; - readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; - if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; - readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max); - pthread_mutex_init(&readPool.mutex, NULL); + tsVReadWP.min = tsNumOfCores; + tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore; + if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min; + tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max); + pthread_mutex_init(&tsVReadWP.mutex, NULL); - if (readPool.readWorker == NULL) return -1; - for (int i = 0; i < readPool.max; ++i) { - SReadWorker *pWorker = readPool.readWorker + i; + if (tsVReadWP.worker == NULL) return -1; + for (int i = 0; i < tsVReadWP.max; ++i) { + SVReadWorker *pWorker = tsVReadWP.worker + i; pWorker->workerId = i; } - dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max); + dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max); return 0; } -void dnodeCleanupVnodeRead() { - for (int i = 0; i < readPool.max; ++i) { - SReadWorker *pWorker = readPool.readWorker + i; +void dnodeCleanupVRead() { + for (int i = 0; i < tsVReadWP.max; ++i) { + SVReadWorker *pWorker = tsVReadWP.worker + i; if (pWorker->thread) { - taosQsetThreadResume(readQset); + taosQsetThreadResume(tsVReadQset); } } - for (int i = 0; i < readPool.max; ++i) { - SReadWorker *pWorker = readPool.readWorker + i; + for (int i = 0; i < tsVReadWP.max; ++i) { + SVReadWorker *pWorker = tsVReadWP.worker + i; if (pWorker->thread) { pthread_join(pWorker->thread, NULL); } } - free(readPool.readWorker); - taosCloseQset(readQset); - pthread_mutex_destroy(&readPool.mutex); + free(tsVReadWP.worker); + taosCloseQset(tsVReadQset); + pthread_mutex_destroy(&tsVReadWP.mutex); - dInfo("dnode read is closed"); + dInfo("dnode vread is closed"); } -void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { - int32_t queuedMsgNum = 0; - int32_t leftLen = pMsg->contLen; - char *pCont = (char *) pMsg->pCont; +void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { + int32_t queuedMsgNum = 0; + int32_t leftLen = pMsg->contLen; + char * pCont = (char *)pMsg->pCont; while (leftLen > 0) { - SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = htonl(pHead->vgId); + SMsgHead *pHead = (SMsgHead *)pCont; + pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); taos_queue queue = vnodeAcquireRqueue(pHead->vgId); @@ -106,10 +101,10 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { } // put message into queue - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg = *pMsg; - pRead->pCont = pCont; - pRead->contLen = pHead->contLen; + SVReadMsg *pRead = taosAllocateQitem(sizeof(SVReadMsg)); + pRead->rpcMsg = *pMsg; + pRead->pCont = pCont; + pRead->contLen = pHead->contLen; // next vnode leftLen -= pHead->contLen; @@ -120,60 +115,52 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { } if (queuedMsgNum == 0) { - SRpcMsg rpcRsp = { - .handle = pMsg->handle, - .pCont = NULL, - .contLen = 0, - .code = TSDB_CODE_VND_INVALID_VGROUP_ID, - .msgType = 0 - }; + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; rpcSendResponse(&rpcRsp); rpcFreeCont(pMsg->pCont); } } void *dnodeAllocVReadQueue(void *pVnode) { - pthread_mutex_lock(&readPool.mutex); + pthread_mutex_lock(&tsVReadWP.mutex); taos_queue queue = taosOpenQueue(); if (queue == NULL) { - pthread_mutex_unlock(&readPool.mutex); + pthread_mutex_unlock(&tsVReadWP.mutex); return NULL; } - taosAddIntoQset(readQset, queue, pVnode); + taosAddIntoQset(tsVReadQset, queue, pVnode); // spawn a thread to process queue - if (readPool.num < readPool.max) { + if (tsVReadWP.num < tsVReadWP.max) { do { - SReadWorker *pWorker = readPool.readWorker + readPool.num; + SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num; pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) { - dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + dError("failed to create thread to process vread vqueue since %s", strerror(errno)); } pthread_attr_destroy(&thAttr); - readPool.num++; - dDebug("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num); - } while (readPool.num < readPool.min); + tsVReadWP.num++; + dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num); + } while (tsVReadWP.num < tsVReadWP.min); } - pthread_mutex_unlock(&readPool.mutex); - dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue); + pthread_mutex_unlock(&tsVReadWP.mutex); + dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue); return queue; } void dnodeFreeVReadQueue(void *rqueue) { taosCloseQueue(rqueue); - - // dynamically adjust the number of threads } -void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { +void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { .handle = pRead->rpcMsg.handle, .pCont = pRead->rspRet.rsp, @@ -186,33 +173,33 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { vnodeRelease(pVnode); } -void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { +void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) { rpcFreeCont(pRead->rpcMsg.pCont); vnodeRelease(pVnode); } static void *dnodeProcessReadQueue(void *param) { - SReadMsg *pReadMsg; - int type; - void *pVnode; + SVReadMsg *pReadMsg; + int32_t qtype; + void * pVnode; while (1) { - if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { - dDebug("qset:%p dnode read got no message from qset, exiting", readQset); + if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pReadMsg, &pVnode) == 0) { + dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset); break; } dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle, - taosMsg[pReadMsg->rpcMsg.msgType], type, pReadMsg); + taosMsg[pReadMsg->rpcMsg.msgType], qtype, pReadMsg); int32_t code = vnodeProcessRead(pVnode, pReadMsg); - if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { - dnodeSendRpcReadRsp(pVnode, pReadMsg, code); + if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { + dnodeSendRpcVReadRsp(pVnode, pReadMsg, code); } else { if (code == TSDB_CODE_QRY_HAS_RSP) { - dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); - } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client + dnodeSendRpcVReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); + } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5)); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); } @@ -223,19 +210,3 @@ static void *dnodeProcessReadQueue(void *param) { return NULL; } - - -UNUSED_FUNC -static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) { - int32_t num = taosGetQueueNumber(readQset); - - if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) { - readPool.num--; - dDebug("read worker:%d is released, total:%d", pWorker->workerId, readPool.num); - pthread_exit(NULL); - } else { - usleep(30000); - sched_yield(); - } -} - diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 9b58d7bb6187d5dcafc40fb7287b6a4f6dd71512..b74dfbfdda987dd156f8ed10c86aa578b9720137 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -15,13 +15,12 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "taoserror.h" +#include "taosmsg.h" #include "tglobal.h" #include "tqueue.h" -#include "tsdb.h" #include "twal.h" -#include "tsync.h" #include "vnode.h" -#include "syncInt.h" #include "dnodeInt.h" typedef struct { @@ -29,22 +28,21 @@ typedef struct { taos_qset qset; // queue set int32_t workerId; // worker ID pthread_t thread; // thread -} SWriteWorker; - +} SVWriteWorker; typedef struct { int32_t max; // max number of workers int32_t nextId; // from 0 to max-1, cyclic - SWriteWorker *worker; + SVWriteWorker * worker; pthread_mutex_t mutex; -} SWriteWorkerPool; +} SVWriteWorkerPool; -static SWriteWorkerPool tsVWriteWP; -static void *dnodeProcessWriteQueue(void *param); +static SVWriteWorkerPool tsVWriteWP; +static void *dnodeProcessVWriteQueue(void *param); int32_t dnodeInitVWrite() { tsVWriteWP.max = tsNumOfCores; - tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max); + tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max); if (tsVWriteWP.worker == NULL) return -1; pthread_mutex_init(&tsVWriteWP.mutex, NULL); @@ -58,14 +56,14 @@ int32_t dnodeInitVWrite() { void dnodeCleanupVWrite() { for (int32_t i = 0; i < tsVWriteWP.max; ++i) { - SWriteWorker *pWorker = tsVWriteWP.worker + i; + SVWriteWorker *pWorker = tsVWriteWP.worker + i; if (pWorker->thread) { taosQsetThreadResume(pWorker->qset); } } for (int32_t i = 0; i < tsVWriteWP.max; ++i) { - SWriteWorker *pWorker = tsVWriteWP.worker + i; + SVWriteWorker *pWorker = tsVWriteWP.worker + i; if (pWorker->thread) { pthread_join(pWorker->thread, NULL); taosFreeQall(pWorker->qall); @@ -100,7 +98,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { pHead->msgType = pRpcMsg->msgType; pHead->version = 0; pHead->len = pMsg->contLen; - code = vnodeWriteToQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg); + code = vnodeWriteToWQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg); } if (code != TSDB_CODE_SUCCESS) { @@ -114,7 +112,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) { void *dnodeAllocVWriteQueue(void *pVnode) { pthread_mutex_lock(&tsVWriteWP.mutex); - SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId; + SVWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId; void *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&tsVWriteWP.mutex); @@ -141,7 +139,7 @@ void *dnodeAllocVWriteQueue(void *pVnode) { pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { + if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessVWriteQueue, pWorker) != 0) { dError("failed to create thread to process vwrite queue since %s", strerror(errno)); taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); @@ -190,12 +188,12 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { vnodeRelease(pVnode); } -static void *dnodeProcessWriteQueue(void *param) { - SWriteWorker *pWorker = param; - SVWriteMsg * pWrite; - void * pVnode; - int32_t numOfMsgs; - int32_t qtype; +static void *dnodeProcessVWriteQueue(void *param) { + SVWriteWorker *pWorker = param; + SVWriteMsg * pWrite; + void * pVnode; + int32_t numOfMsgs; + int32_t qtype; dDebug("dnode vwrite worker:%d is running", pWorker->workerId); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index b4973cc672546dc99531e3f6349d9761067f94f8..6032d8cc0a0ae18f91ffb01a59f31c731ec8318f 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -55,9 +55,9 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeAllocVWriteQueue(void *pVnode); void dnodeFreeVWriteQueue(void *wqueue); +void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void *dnodeAllocVReadQueue(void *pVnode); void dnodeFreeVReadQueue(void *rqueue); -void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); int32_t dnodeAllocateMnodePqueue(); void dnodeFreeMnodePqueue(); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 09273d3907bfcdf80b06788e891a2a70112dc86a..e77bec926af65c1d6a7061c17384934ca1fddd87 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -37,20 +37,20 @@ typedef struct { } SRspRet; typedef struct { - SRspRet rspRet; - void *pCont; - int32_t contLen; - SRpcMsg rpcMsg; -} SReadMsg; + SRspRet rspRet; + void * pCont; + int32_t contLen; + SRpcMsg rpcMsg; +} SVReadMsg; typedef struct { - int32_t code; - int32_t processedCount; - void * rpcHandle; - void * rpcAhandle; - SRspRet rspRet; - char reserveForSync[16]; - SWalHead pHead[]; + int32_t code; + int32_t processedCount; + void * rpcHandle; + void * rpcAhandle; + SRspRet rspRet; + char reserveForSync[16]; + SWalHead pHead[]; } SVWriteMsg; extern char *vnodeStatus[]; @@ -66,7 +66,7 @@ void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); -int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); +int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); @@ -77,7 +77,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); int32_t vnodeInitResources(); void vnodeCleanupResources(); -int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); +int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pReadMsg); int32_t vnodeCheckRead(void *pVnode); #ifdef __cplusplus diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index a13addb27cf79c1a083114fe47cfc726e6bfbb93..2dcdba5d7c544de68ec4de9b7bc4e455d88805a0 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; - cqCfg.cqWrite = vnodeWriteToQueue; + cqCfg.cqWrite = vnodeWriteToWQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { vnodeCleanUp(pVnode); @@ -320,7 +320,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.ahandle = pVnode; syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; - syncInfo.writeToCache = vnodeWriteToQueue; + syncInfo.writeToCache = vnodeWriteToWQueue; syncInfo.confirmForward = dnodeSendRpcVWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFlowCtrl = vnodeCtrlFlow; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 99aed03e54ccd069e5879104f62eb01ff7bb3d05..faa35b0e02d783c1d0b415a60a77c6e98618041e 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -29,9 +29,9 @@ #include "vnodeInt.h" #include "tqueue.h" -static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); +static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pReadMsg); +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg); +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); void vnodeInitReadFp(void) { @@ -44,7 +44,7 @@ void vnodeInitReadFp(void) { // still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the // request enters the queue // -int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { +int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) { SVnodeObj *pVnode = (SVnodeObj *)param; int msgType = pReadMsg->rpcMsg.msgType; @@ -82,7 +82,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void int32_t code = vnodeCheckRead(pVnode); if (code != TSDB_CODE_SUCCESS) return code; - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + SVReadMsg *pRead = (SVReadMsg *)taosAllocateQitem(sizeof(SVReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; pRead->contLen = 0; @@ -146,7 +146,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { pRsp->completed = true; } -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { void * pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; SRspRet *pRet = &pReadMsg->rspRet; @@ -274,7 +274,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { return code; } -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) { void * pCont = pReadMsg->pCont; SRspRet *pRet = &pReadMsg->rspRet; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 417448423ae82ec9633df755c278d60c0ea8696d..80c68b09178372d30381e0e471bb2ed4fba7349c 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -205,7 +205,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR return TSDB_CODE_SUCCESS; } -int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { +int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { SVnodeObj *pVnode = vparam; SWalHead * pHead = wparam;