From 8fb299ce4d2dfb360d9bf80ae66659f28f8542af Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 5 Dec 2020 14:46:58 +0800 Subject: [PATCH] TD-2324 --- src/dnode/src/dnodeMgmt.c | 6 +- src/inc/vnode.h | 2 +- src/util/src/tqueue.c | 4 +- src/vnode/inc/vnodeMain.h | 2 +- src/vnode/inc/vnodeWorker.h | 33 ++++++ src/vnode/src/vnodeMain.c | 18 ++-- src/vnode/src/vnodeWorker.c | 205 ++++++++++++++++++++++++++++++++++++ 7 files changed, 254 insertions(+), 16 deletions(-) create mode 100644 src/vnode/inc/vnodeWorker.h create mode 100644 src/vnode/src/vnodeWorker.c diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 4c1f6837e3..bc7e5ff33c 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -265,14 +265,12 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { static void *dnodeOpenVnode(void *param) { SOpenVnodeThread *pThread = param; - char vnodeDir[TSDB_FILENAME_LEN * 3]; - + dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); for (int32_t v = 0; v < pThread->vnodeNum; ++v) { int32_t vgId = pThread->vnodeList[v]; - snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId); - if (vnodeOpen(vgId, vnodeDir) < 0) { + if (vnodeOpen(vgId) < 0) { dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex); pThread->failed++; } else { diff --git a/src/inc/vnode.h b/src/inc/vnode.h index e463b1730b..ba64a3d826 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -57,7 +57,7 @@ extern char *vnodeStatus[]; // vnodeMain int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); -int32_t vnodeOpen(int32_t vgId, char *rootDir); +int32_t vnodeOpen(int32_t vgId); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 143ce7c474..c15ae729ed 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -358,8 +358,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand if (queue->head) { pNode = queue->head; *pitem = pNode->item; - *type = pNode->type; - *phandle = queue->ahandle; + if (type) *type = pNode->type; + if (phandle) *phandle = queue->ahandle; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; diff --git a/src/vnode/inc/vnodeMain.h b/src/vnode/inc/vnodeMain.h index 7e9ccf08a0..d2a8700551 100644 --- a/src/vnode/inc/vnodeMain.h +++ b/src/vnode/inc/vnodeMain.h @@ -23,7 +23,7 @@ extern "C" { int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); -int32_t vnodeOpen(int32_t vgId, char *rootDir); +int32_t vnodeOpen(int32_t vgId); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); diff --git a/src/vnode/inc/vnodeWorker.h b/src/vnode/inc/vnodeWorker.h new file mode 100644 index 0000000000..291943f63d --- /dev/null +++ b/src/vnode/inc/vnodeWorker.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_WORKER_H +#define TDENGINE_VNODE_WORKER_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnodeInt.h" + +int32_t vnodeInitMWorker(); +void vnodeCleanupMWorker(); +int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle); +int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 1da6c58224..a043c655e2 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -84,7 +84,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod); - code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); + code = vnodeOpen(pVnodeCfg->cfg.vgId); return code; } @@ -158,18 +158,20 @@ int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) { return code; } -int32_t vnodeOpen(int32_t vnode, char *rootDir) { - char temp[TSDB_FILENAME_LEN]; +int32_t vnodeOpen(int32_t vgId) { + char temp[TSDB_FILENAME_LEN * 3]; + char rootDir[TSDB_FILENAME_LEN * 2]; + snprintf(rootDir, TSDB_FILENAME_LEN * 2, "%s/vnode%d", tsVnodeDir, vgId); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); if (pVnode == NULL) { - vError("vgId:%d, failed to open vnode since no enough memory", vnode); + vError("vgId:%d, failed to open vnode since no enough memory", vgId); return TAOS_SYSTEM_ERROR(errno); } atomic_add_fetch_32(&pVnode->refCount, 1); - pVnode->vgId = vnode; + pVnode->vgId = vgId; pVnode->fversion = 0; pVnode->version = 0; pVnode->tsdbCfg.tsdbId = pVnode->vgId; @@ -206,7 +208,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { sprintf(cqCfg.user, "_root"); strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.db, pVnode->db); - cqCfg.vgId = vnode; + cqCfg.vgId = vgId; cqCfg.cqWrite = vnodeWriteToCache; pVnode->cq = cqOpen(pVnode, &cqCfg); if (pVnode->cq == NULL) { @@ -220,7 +222,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { appH.cqH = pVnode->cq; appH.cqCreateFunc = cqCreate; appH.cqDropFunc = cqDrop; - sprintf(temp, "vnode/vnode%d/tsdb", vnode); + sprintf(temp, "vnode/vnode%d/tsdb", vgId); terrno = 0; pVnode->tsdb = tsdbOpenRepo(temp, &appH); @@ -280,7 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.vgId = pVnode->vgId; syncInfo.version = pVnode->version; syncInfo.syncCfg = pVnode->syncCfg; - sprintf(syncInfo.path, "%s", rootDir); + tstrncpy(syncInfo.path, rootDir, TSDB_FILENAME_LEN); syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.writeToCache = vnodeWriteToCache; diff --git a/src/vnode/src/vnodeWorker.c b/src/vnode/src/vnodeWorker.c new file mode 100644 index 0000000000..8c257cdc9b --- /dev/null +++ b/src/vnode/src/vnodeWorker.c @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tutil.h" +#include "tqueue.h" +#include "tglobal.h" +#include "vnodeWorker.h" + +typedef enum { + VNODE_WORKER_ACTION_CREATE, + VNODE_WORKER_ACTION_DELETE, + VNODE_WORKER_ACTION_ALTER +} EVMWorkerAction; + +typedef struct { + int32_t vgId; + int32_t code; + void * rpcHandle; + SVnodeObj *pVnode; + EVMWorkerAction action; +} SVMWorkerMsg; + +typedef struct { + pthread_t thread; + int32_t workerId; +} SVMWorker; + +typedef struct { + int32_t curNum; + int32_t maxNum; + SVMWorker *worker; +} SVMWorkerPool; + +static SVMWorkerPool tsVMWorkerPool; +static taos_qset tsVMWorkerQset; +static taos_queue tsVMWorkerQueue; + +static void *vnodeMWorkerFunc(void *param); + +static int32_t vnodeStartMWorker() { + tsVMWorkerQueue = taosOpenQueue(); + if (tsVMWorkerQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY; + + taosAddIntoQset(tsVMWorkerQset, tsVMWorkerQueue, NULL); + + for (int32_t i = tsVMWorkerPool.curNum; i < tsVMWorkerPool.maxNum; ++i) { + SVMWorker *pWorker = tsVMWorkerPool.worker + i; + pWorker->workerId = i; + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pWorker->thread, &thAttr, vnodeMWorkerFunc, pWorker) != 0) { + vError("failed to create thread to process vmworker queue, reason:%s", strerror(errno)); + } + + pthread_attr_destroy(&thAttr); + + tsVMWorkerPool.curNum = i + 1; + vDebug("vmworker:%d is launched, total:%d", pWorker->workerId, tsVMWorkerPool.maxNum); + } + + vDebug("vmworker queue:%p is allocated", tsVMWorkerQueue); + return TSDB_CODE_SUCCESS; +} + +int32_t vnodeInitMWorker() { + tsVMWorkerQset = taosOpenQset(); + + tsVMWorkerPool.maxNum = 1; + tsVMWorkerPool.curNum = 0; + tsVMWorkerPool.worker = calloc(sizeof(SVMWorker), tsVMWorkerPool.maxNum); + + if (tsVMWorkerPool.worker == NULL) return -1; + for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) { + SVMWorker *pWorker = tsVMWorkerPool.worker + i; + pWorker->workerId = i; + vDebug("vmworker:%d is created", i); + } + + vDebug("vmworker is initialized, num:%d qset:%p", tsVMWorkerPool.maxNum, tsVMWorkerQset); + + return vnodeStartMWorker(); +} + +static void vnodeStopMWorker() { + vDebug("vmworker queue:%p is freed", tsVMWorkerQueue); + taosCloseQueue(tsVMWorkerQueue); + tsVMWorkerQueue = NULL; +} + +void vnodeCleanupMWorker() { + for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) { + SVMWorker *pWorker = tsVMWorkerPool.worker + i; + if (pWorker->thread) { + taosQsetThreadResume(tsVMWorkerQset); + } + vDebug("vmworker:%d is closed", i); + } + + for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) { + SVMWorker *pWorker = tsVMWorkerPool.worker + i; + vDebug("vmworker:%d start to join", i); + if (pWorker->thread) { + pthread_join(pWorker->thread, NULL); + } + vDebug("vmworker:%d join success", i); + } + + vDebug("vmworker is closed, qset:%p", tsVMWorkerQset); + + taosCloseQset(tsVMWorkerQset); + tsVMWorkerQset = NULL; + tfree(tsVMWorkerPool.worker); + + vnodeStopMWorker(); +} + +static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) { + SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg)); + if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY; + + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID; + + pMsg->vgId = vgId; + pMsg->pVnode = pVnode; + pMsg->rpcHandle = rpcHandle; + pMsg->action = action; + return taosWriteQitem(tsVMWorkerQueue, TAOS_QTYPE_RPC, pMsg); +} + +int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) { + vTrace("vgId:%d, will open in vmworker", vgId); + return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle); +} + +int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) { + vTrace("vgId:%d, will cleanup in vmworker", vgId); + return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle); +} + +static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) { + vTrace("vgId:%d, disposed in vmworker", pMsg->vgId); + vnodeRelease(pMsg->pVnode); + taosFreeQitem(pMsg); +} + +static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) { + SRpcMsg rpcRsp = { + .handle = pMsg->rpcHandle, + .code = pMsg->code, + }; + + rpcSendResponse(&rpcRsp); + vnodeFreeMWorkerMsg(pMsg); +} + +static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) { + pMsg->code = 0; + + switch (pMsg->action) { + case VNODE_WORKER_ACTION_CREATE: + pMsg->code = vnodeOpen(pMsg->vgId); + break; + case VNODE_WORKER_ACTION_DELETE: + pMsg->code = vnodeDrop(pMsg->vgId); + break; + default: + break; + } +} + +static void *vnodeMWorkerFunc(void *param) { + while (1) { + SVMWorkerMsg *pMsg = NULL; + if (taosReadQitemFromQset(tsVMWorkerQset, NULL, (void **)&pMsg, NULL) == 0) { + vDebug("qset:%p, vmworker got no message from qset, exiting", tsVMWorkerQset); + break; + } + + vTrace("vgId:%d, action:%d will be processed in vmworker queue", pMsg->vgId, pMsg->action); + vnodeProcessMWorkerMsg(pMsg); + vnodeSendVMWorkerRpcRsp(pMsg); + } + + return NULL; +} -- GitLab