From 259e2941311468bdbaa791beb9b463ea8440e6a8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 9 Dec 2020 14:27:46 +0800 Subject: [PATCH] TD-2393 --- src/dnode/inc/dnodeVRead.h | 6 +- src/dnode/src/dnodeVRead.c | 43 ++++++--- src/inc/dnode.h | 6 +- src/vnode/inc/vnodeCancel.h | 33 ------- src/vnode/inc/vnodeInt.h | 5 +- src/vnode/src/vnodeCancel.c | 169 ------------------------------------ src/vnode/src/vnodeMain.c | 17 ++-- src/vnode/src/vnodeRead.c | 17 ++-- 8 files changed, 60 insertions(+), 236 deletions(-) delete mode 100644 src/vnode/inc/vnodeCancel.h delete mode 100644 src/vnode/src/vnodeCancel.c diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index 30dfb1b3a4..9c88886f88 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -24,8 +24,10 @@ extern "C" { int32_t dnodeInitVRead(); void dnodeCleanupVRead(); void dnodeDispatchToVReadQueue(SRpcMsg *pMsg); -void * dnodeAllocVReadQueue(void *pVnode); -void dnodeFreeVReadQueue(void *pRqueue); +void * dnodeAllocVQueryQueue(void *pVnode); +void * dnodeAllocVFetchQueue(void *pVnode); +void dnodeFreeVQueryQueue(void *pQqueue); +void dnodeFreeVFetchQueue(void *pFqueue); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 88b8996831..46a21c1240 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -22,20 +22,29 @@ static void *dnodeProcessReadQueue(void *pWorker); // module global variable -static SWorkerPool tsVReadWP; +static SWorkerPool tsVQueryWP; +static SWorkerPool tsVFetchWP; int32_t dnodeInitVRead() { - tsVReadWP.name = "vquery"; - tsVReadWP.workerFp = dnodeProcessReadQueue; - tsVReadWP.min = tsNumOfCores; - tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore; - if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min; - - return tWorkerInit(&tsVReadWP); + tsVQueryWP.name = "vquery"; + tsVQueryWP.workerFp = dnodeProcessReadQueue; + tsVQueryWP.min = tsNumOfCores; + tsVQueryWP.max = tsNumOfCores * tsNumOfThreadsPerCore; + if (tsVQueryWP.max <= tsVQueryWP.min * 2) tsVQueryWP.max = 2 * tsVQueryWP.min; + if (tWorkerInit(&tsVQueryWP) != 0) return -1; + + tsVFetchWP.name = "vfetch"; + tsVFetchWP.workerFp = dnodeProcessReadQueue; + tsVFetchWP.min = 1; + tsVFetchWP.max = 1; + if (tWorkerInit(&tsVFetchWP) != 0) return -1; + + return 0; } void dnodeCleanupVRead() { - tWorkerCleanup(&tsVReadWP); + tWorkerCleanup(&tsVFetchWP); + tWorkerCleanup(&tsVQueryWP); } void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { @@ -68,12 +77,20 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -void *dnodeAllocVReadQueue(void *pVnode) { - return tWorkerAllocQueue(&tsVReadWP, pVnode); +void *dnodeAllocVQueryQueue(void *pVnode) { + return tWorkerAllocQueue(&tsVQueryWP, pVnode); +} + +void *dnodeAllocVFetchQueue(void *pVnode) { + return tWorkerAllocQueue(&tsVFetchWP, pVnode); +} + +void dnodeFreeVQueryQueue(void *pQqueue) { + tWorkerFreeQueue(&tsVQueryWP, pQqueue); } -void dnodeFreeVReadQueue(void *pRqueue) { - tWorkerFreeQueue(&tsVReadWP, pRqueue); +void dnodeFreeVFetchQueue(void *pFqueue) { + tWorkerFreeQueue(&tsVFetchWP, pFqueue); } void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) { diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 9dd95e32d7..cc8cdf0838 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -56,8 +56,10 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeAllocVWriteQueue(void *pVnode); void dnodeFreeVWriteQueue(void *pWqueue); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code); -void *dnodeAllocVReadQueue(void *pVnode); -void dnodeFreeVReadQueue(void *pRqueue); +void *dnodeAllocVQueryQueue(void *pVnode); +void *dnodeAllocVFetchQueue(void *pVnode); +void dnodeFreeVQueryQueue(void *pQqueue); +void dnodeFreeVFetchQueue(void *pFqueue); int32_t dnodeAllocateMPeerQueue(); void dnodeFreeMPeerQueue(); diff --git a/src/vnode/inc/vnodeCancel.h b/src/vnode/inc/vnodeCancel.h deleted file mode 100644 index 32096739ac..0000000000 --- a/src/vnode/inc/vnodeCancel.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_VNODE_CANCEL_H -#define TDENGINE_VNODE_CANCEL_H - -#ifdef __cplusplus -extern "C" { -#endif -#include "vnode.h" -#include "vnodeInt.h" - -int32_t vnodeInitCWorker(); -void vnodeCleanupCWorker(); -int32_t vnodeWriteIntoCQueue(SVnodeObj *pVnode, SVReadMsg *pRead); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 401c217b9a..34f7d64ed1 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -47,8 +47,9 @@ typedef struct { int8_t isCommiting; uint64_t version; // current version uint64_t fversion; // version on saved data file - void * wqueue; - void * rqueue; + void * wqueue; // write queue + void * qqueue; // read query queue + void * fqueue; // read fetch/cancel queue void * wal; void * tsdb; int64_t sync; diff --git a/src/vnode/src/vnodeCancel.c b/src/vnode/src/vnodeCancel.c deleted file mode 100644 index 5f422d798c..0000000000 --- a/src/vnode/src/vnodeCancel.c +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "taoserror.h" -#include "taosmsg.h" -#include "tglobal.h" -#include "tqueue.h" -#include "dnode.h" -#include "tsdb.h" -#include "vnodeCancel.h" - -typedef struct { - pthread_t thread; - int32_t workerId; -} SVCWorker; - -typedef struct { - int32_t curNum; - int32_t maxNum; - SVCWorker *worker; -} SVCWorkerPool; - -static SVCWorkerPool tsVCWorkerPool; -static taos_qset tsVCWorkerQset; -static taos_queue tsVCWorkerQueue; - -static void *vnodeCWorkerFunc(void *param); - -static int32_t vnodeStartCWorker() { - tsVCWorkerQueue = taosOpenQueue(); - if (tsVCWorkerQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY; - - taosAddIntoQset(tsVCWorkerQset, tsVCWorkerQueue, NULL); - - for (int32_t i = tsVCWorkerPool.curNum; i < tsVCWorkerPool.maxNum; ++i) { - SVCWorker *pWorker = tsVCWorkerPool.worker + i; - pWorker->workerId = i; - - pthread_attr_t thAttr; - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - - if (pthread_create(&pWorker->thread, &thAttr, vnodeCWorkerFunc, pWorker) != 0) { - vError("failed to create thread to process vcworker queue, reason:%s", strerror(errno)); - } - - pthread_attr_destroy(&thAttr); - - tsVCWorkerPool.curNum = i + 1; - vDebug("vcworker:%d is launched, total:%d", pWorker->workerId, tsVCWorkerPool.maxNum); - } - - vDebug("vcworker queue:%p is allocated", tsVCWorkerQueue); - return TSDB_CODE_SUCCESS; -} - -int32_t vnodeInitCWorker() { - tsVCWorkerQset = taosOpenQset(); - - tsVCWorkerPool.maxNum = 1; - tsVCWorkerPool.curNum = 0; - tsVCWorkerPool.worker = calloc(sizeof(SVCWorker), tsVCWorkerPool.maxNum); - - if (tsVCWorkerPool.worker == NULL) return -1; - for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) { - SVCWorker *pWorker = tsVCWorkerPool.worker + i; - pWorker->workerId = i; - vDebug("vcworker:%d is created", i); - } - - vDebug("vcworker is initialized, num:%d qset:%p", tsVCWorkerPool.maxNum, tsVCWorkerQset); - - return vnodeStartCWorker(); -} - -static void vnodeStopCWorker() { - vDebug("vcworker queue:%p is freed", tsVCWorkerQueue); - taosCloseQueue(tsVCWorkerQueue); - tsVCWorkerQueue = NULL; -} - -void vnodeCleanupCWorker() { - for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) { - SVCWorker *pWorker = tsVCWorkerPool.worker + i; - if (pWorker->thread) { - taosQsetThreadResume(tsVCWorkerQset); - } - vDebug("vcworker:%d is closed", i); - } - - for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) { - SVCWorker *pWorker = tsVCWorkerPool.worker + i; - vDebug("vcworker:%d start to join", i); - if (pWorker->thread) { - pthread_join(pWorker->thread, NULL); - } - vDebug("vcworker:%d join success", i); - } - - vDebug("vcworker is closed, qset:%p", tsVCWorkerQset); - - taosCloseQset(tsVCWorkerQset); - tsVCWorkerQset = NULL; - tfree(tsVCWorkerPool.worker); - - vnodeStopCWorker(); -} - -int32_t vnodeWriteIntoCQueue(SVnodeObj *pVnode, SVReadMsg *pRead) { - atomic_add_fetch_32(&pVnode->refCount, 1); - pRead->pVnode = pVnode; - - vTrace("vgId:%d, write into vcqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); - return taosWriteQitem(tsVCWorkerQueue, pRead->qtype, pRead); -} - -static void vnodeFreeFromCQueue(SVnodeObj *pVnode, SVReadMsg *pRead) { - vTrace("vgId:%d, free from vcqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); - taosFreeQitem(pRead); - vnodeRelease(pVnode); -} - -static void vnodeSendVCancelRpcRsp(SVnodeObj *pVnode, SVReadMsg *pRead, int32_t code) { - SRpcMsg rpcRsp = { - .handle = pRead->rpcHandle, - .pCont = pRead->rspRet.rsp, - .contLen = pRead->rspRet.len, - .code = code, - }; - - rpcSendResponse(&rpcRsp); - vnodeFreeFromCQueue(pVnode, pRead); -} - -static void *vnodeCWorkerFunc(void *param) { - int32_t qtype; - SVReadMsg *pRead; - SVnodeObj *pVnode; - - while (1) { - if (taosReadQitemFromQset(tsVCWorkerQset, &qtype, (void **)&pRead, (void **)&pVnode) == 0) { - vDebug("qset:%p, vcworker got no message from qset, exiting", tsVCWorkerQset); - break; - } - - assert(qtype == TAOS_QTYPE_RPC); - assert(pVnode == NULL); - assert(pRead->pVnode != NULL); - - int32_t code = vnodeProcessRead(pRead->pVnode, pRead); - vnodeSendVCancelRpcRsp(pRead->pVnode, pRead, code); - } - - return NULL; -} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 75ef39cd27..3a603466f4 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -28,7 +28,6 @@ #include "vnodeMgmt.h" #include "vnodeWorker.h" #include "vnodeMain.h" -#include "vnodeCancel.h" static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); @@ -213,8 +212,9 @@ int32_t vnodeOpen(int32_t vgId) { pVnode->fversion = pVnode->version; pVnode->wqueue = dnodeAllocVWriteQueue(pVnode); - pVnode->rqueue = dnodeAllocVReadQueue(pVnode); - if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { + pVnode->qqueue = dnodeAllocVQueryQueue(pVnode); + pVnode->fqueue = dnodeAllocVFetchQueue(pVnode); + if (pVnode->wqueue == NULL || pVnode->qqueue == NULL || pVnode->fqueue == NULL) { vnodeCleanUp(pVnode); return terrno; } @@ -374,9 +374,14 @@ void vnodeDestroy(SVnodeObj *pVnode) { pVnode->wqueue = NULL; } - if (pVnode->rqueue) { - dnodeFreeVReadQueue(pVnode->rqueue); - pVnode->rqueue = NULL; + if (pVnode->qqueue) { + dnodeFreeVQueryQueue(pVnode->qqueue); + pVnode->qqueue = NULL; + } + + if (pVnode->fqueue) { + dnodeFreeVFetchQueue(pVnode->fqueue); + pVnode->fqueue = NULL; } tfree(pVnode->rootDir); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 43762095e8..c1caf291b4 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -14,11 +14,9 @@ */ #define _DEFAULT_SOURCE - #include "os.h" #include "taosmsg.h" #include "tqueue.h" -#include "vnodeCancel.h" #include "tglobal.h" #include "query.h" #include "vnodeStatus.h" @@ -119,15 +117,16 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt } pRead->qtype = qtype; + atomic_add_fetch_32(&pVnode->refCount, 1); + atomic_add_fetch_32(&pVnode->queuedRMsg, 1); - if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_CANCEL_QUERY) { - pRead->msgType = TSDB_MSG_TYPE_CANCEL_QUERY; - return vnodeWriteIntoCQueue(pVnode, pRead); + if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_CANCEL_QUERY || + pRead->msgType == TSDB_MSG_TYPE_FETCH) { + vTrace("vgId:%d, write into vfetch queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + return taosWriteQitem(pVnode->fqueue, qtype, pRead); } else { - atomic_add_fetch_32(&pVnode->refCount, 1); - atomic_add_fetch_32(&pVnode->queuedRMsg, 1); - vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); - return taosWriteQitem(pVnode->rqueue, qtype, pRead); + vTrace("vgId:%d, write into vquery queue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + return taosWriteQitem(pVnode->qqueue, qtype, pRead); } } -- GitLab