diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 670f4f599c17d120a641a985a04b126721e27468..8b3b4b6ed027b38dc44987e47a675e7d2937456f 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -63,11 +63,9 @@ int32_t vnodeClose(int32_t vgId); void* vnodeAcquire(int32_t vgId); // add refcount void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue -void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); - int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg); int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite); int32_t vnodeCheckWrite(void *pVnode); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8fb0b330605ec6cd63a60ad9e64a5e8dc62b67d4..fc986521e62b33c7512b7213cb99ec7dfddcf705 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall; static taos_queue tsSdbWriteQueue; static SSdbWriteWorkerPool tsSdbPool; -static int sdbWrite(void *param, void *data, int type); -static int sdbWriteToQueue(void *param, void *data, int type); +static int32_t sdbWrite(void *param, void *data, int32_t type, void *pMsg); +static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg); static void * sdbWorkerFp(void *param); static int32_t sdbInitWriteWorker(); static void sdbCleanupWriteWorker(); @@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } -static int sdbWrite(void *param, void *data, int type) { +static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { SSdbOper *pOper = param; SWalHead *pHead = data; int32_t tableId = pHead->msgType / 10; @@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() { tsSdbWriteQueue = NULL; } -int sdbWriteToQueue(void *param, void *data, int type) { +int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) { SWalHead *pHead = data; - int size = sizeof(SWalHead) + pHead->len; + int32_t size = sizeof(SWalHead) + pHead->len; SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); memcpy(pWal, pHead, size); - taosWriteQitem(tsSdbWriteQueue, type, pWal); + taosWriteQitem(tsSdbWriteQueue, qtype, pWal); return 0; } @@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) { pOper = NULL; } - int32_t code = sdbWrite(pOper, pHead, type); + int32_t code = sdbWrite(pOper, pHead, type, NULL); if (code > 0) code = 0; if (pOper) { pOper->retCode = code; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7cbbf0feb86f2e99f058b4d5401109f0d30443d0..49b7e4e7124e6408393be40de29b43a2488b9822 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -483,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) { return pVnode->rqueue; } -void *vnodeAcquireWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) return NULL; - - int32_t code = vnodeCheckWrite(pVnode); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]); - vnodeRelease(pVnode); - return NULL; - } - - return pVnode->wqueue; -} - void *vnodeGetWal(void *pVnode) { return ((SVnodeObj *)pVnode)->wal; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 4432f98db00b733ac5af079d2fed874c224bc40c..0d521b4d2ecf03a1ee433ee4086cf1f08f902f31 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -208,8 +208,9 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) SVnodeObj *pVnode = vparam; SWalHead * pHead = wparam; - if (qtype == TAOS_QTYPE_RPC && vnodeCheckWrite(pVnode) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_VND_INVALID_VGROUP_ID; + if (qtype == TAOS_QTYPE_RPC) { + int32_t code = vnodeCheckWrite(pVnode); + if (code != TSDB_CODE_SUCCESS) return code; } int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;