diff --git a/src/vnode/inc/vnodeCancel.h b/src/vnode/inc/vnodeCancel.h new file mode 100644 index 0000000000000000000000000000000000000000..7459e0707c9ee2f3c59e165baffe7ac770fc95cb --- /dev/null +++ b/src/vnode/inc/vnodeCancel.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_CANCEL_H +#define TDENGINE_VNODE_CANCEL_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnode.h" +#include "vnodeInt.h" + +int32_t vnodeInitCWorker(); +void vnodeCleanupCWorker(); +int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/src/vnodeCancel.c b/src/vnode/src/vnodeCancel.c new file mode 100644 index 0000000000000000000000000000000000000000..2239e384745c2d65acd0b64c05c4812af626341f --- /dev/null +++ b/src/vnode/src/vnodeCancel.c @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "tqueue.h" +#include "dnode.h" +#include "tsdb.h" +#include "vnodeCancel.h" + +typedef struct { + pthread_t thread; + int32_t workerId; +} SVCWorker; + +typedef struct { + int32_t curNum; + int32_t maxNum; + SVCWorker *worker; +} SVCWorkerPool; + +static SVCWorkerPool tsVCWorkerPool; +static taos_qset tsVCWorkerQset; +static taos_queue tsVCWorkerQueue; + +static void *vnodeCWorkerFunc(void *param); + +static int32_t vnodeStartCWorker() { + tsVCWorkerQueue = taosOpenQueue(); + if (tsVCWorkerQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY; + + taosAddIntoQset(tsVCWorkerQset, tsVCWorkerQueue, NULL); + + for (int32_t i = tsVCWorkerPool.curNum; i < tsVCWorkerPool.maxNum; ++i) { + SVCWorker *pWorker = tsVCWorkerPool.worker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, vnodeCWorkerFunc, pWorker) != 0) { + vError("failed to create thread to process vcworker queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + + tsVCWorkerPool.curNum = i + 1; + vDebug("vcworker:%d is launched, total:%d", pWorker->workerId, tsVCWorkerPool.maxNum); + } + + vDebug("vcworker queue:%p is allocated", tsVCWorkerQueue); + return TSDB_CODE_SUCCESS; +} + +int32_t vnodeInitCWorker() { + tsVCWorkerQset = taosOpenQset(); + + tsVCWorkerPool.maxNum = 1; + tsVCWorkerPool.curNum = 0; + tsVCWorkerPool.worker = calloc(sizeof(SVCWorker), tsVCWorkerPool.maxNum); + + if (tsVCWorkerPool.worker == NULL) return -1; + for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) { + SVCWorker *pWorker = tsVCWorkerPool.worker + i; + pWorker->workerId = i; + vDebug("vcworker:%d is created", i); + } + + vDebug("vcworker is initialized, num:%d qset:%p", tsVCWorkerPool.maxNum, tsVCWorkerQset); + + return vnodeStartCWorker(); +} + +static void vnodeStopCWorker() { + vDebug("vcworker queue:%p is freed", tsVCWorkerQueue); + taosCloseQueue(tsVCWorkerQueue); + tsVCWorkerQueue = NULL; +} + +void vnodeCleanupCWorker() { + for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) { + SVCWorker *pWorker = tsVCWorkerPool.worker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsVCWorkerQset); + } + vDebug("vcworker:%d is closed", i); + } + + for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) { + SVCWorker *pWorker = tsVCWorkerPool.worker + i; + vDebug("vcworker:%d start to join", i); + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + vDebug("vcworker:%d join success", i); + } + + vDebug("vcworker is closed, qset:%p", tsVCWorkerQset); + + taosCloseQset(tsVCWorkerQset); + tsVCWorkerQset = NULL; + tfree(tsVCWorkerPool.worker); + + vnodeStopCWorker(); +} + +int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead) { + vTrace("msg:%p, write into vcqueue", pRead); + return taosWriteQitem(tsVCWorkerQueue, pRead->qtype, pRead); +} + +static void vnodeFreeFromCQueue(SVReadMsg *pRead) { + vTrace("msg:%p, free from vcqueue", pRead); + taosFreeQitem(pRead); +} + +static void vnodeSendVCancelRpcRsp(SVReadMsg *pRead, int32_t code) { + SRpcMsg rpcRsp = { + .handle = pRead->rpcHandle, + .pCont = pRead->rspRet.rsp, + .contLen = pRead->rspRet.len, + .code = code, + }; + + rpcSendResponse(&rpcRsp); + vnodeFreeFromCQueue(pRead); +} + +static void *vnodeCWorkerFunc(void *param) { + int32_t qtype; + SVReadMsg *pRead; + SVnodeObj *pVnode; + + while (1) { + if (taosReadQitemFromQset(tsVCWorkerQset, &qtype, (void **)&pRead, (void **)&pVnode) == 0) { + vDebug("qset:%p, vcworker got no message from qset, exiting", tsVCWorkerQset); + break; + } + + vTrace("msg:%p will be processed in vcworker queue", pRead); + + assert(qtype == TAOS_QTYPE_RPC); + assert(pVnode == NULL); + + int32_t code = vnodeProcessRead(NULL, pRead); + vnodeSendVCancelRpcRsp(pRead, code); + } + + return NULL; +} diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b516c9d90e126d68ca502af576d6f400dedd175e..b45c0a9d57c42b75cd096df6fb3605a9a0242edb 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -27,6 +27,7 @@ #include "dnode.h" #include "vnodeCfg.h" #include "vnodeVersion.h" +#include "vnodeCancel.h" static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); @@ -63,6 +64,7 @@ int32_t vnodeInitResources() { vnodeInitWriteFp(); vnodeInitReadFp(); + vnodeInitCWorker(); tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVnodesHash == NULL) { @@ -79,6 +81,7 @@ int32_t vnodeInitResources() { } void vnodeCleanupResources() { + vnodeCleanupCWorker(); tsdbDestroyCommitQueue(); if (tsVnodesHash != NULL) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index ed6d29505f04df25f48ffe1692d2914816001b75..f7a7afd9db8f20bf7dbf5af5cb1d7542464179d9 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -25,6 +25,7 @@ #include "vnode.h" #include "vnodeInt.h" #include "tqueue.h" +#include "vnodeCancel.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); @@ -115,13 +116,15 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt } pRead->qtype = qtype; - - atomic_add_fetch_32(&pVnode->refCount, 1); - atomic_add_fetch_32(&pVnode->queuedRMsg, 1); - vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); - - taosWriteQitem(pVnode->rqueue, qtype, pRead); - return TSDB_CODE_SUCCESS; + + if (pRead->msgType == TSDB_MSG_TYPE_CM_KILL_QUERY) { + return vnodeWriteIntoCQueue(pRead); + } else { + atomic_add_fetch_32(&pVnode->refCount, 1); + atomic_add_fetch_32(&pVnode->queuedRMsg, 1); + vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); + return taosWriteQitem(pVnode->rqueue, qtype, pRead); + } } static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {