diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0d1a56cfc4516f34eed18d6a402158bb922aa6b4..9aab18672a3ee5c95b1264197bb50e708627e929 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -84,7 +84,7 @@ int32_t dnodeInitMgmt() { dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); tsRebootTime = taosGetTimestampSec(); - int32_t code = vnodeInitResources(); + int32_t code = vnodeInitMgmt(); if (code != TSDB_CODE_SUCCESS) { dnodeCleanupMgmt(); return -1; @@ -174,7 +174,7 @@ void dnodeCleanupMgmt() { tsMgmtQset = NULL; tsMgmtQueue = NULL; - vnodeCleanupResources(); + vnodeCleanupMgmt(); } static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 58859f42bc80daa3317d789950c1625c1533cf5f..1769bd6566bbc27d75bf6b830add8ef79356f111 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -321,7 +321,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); */ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); -int tsdbInitCommitQueue(int nthreads); +int tsdbInitCommitQueue(); void tsdbDestroyCommitQueue(); int tsdbSyncCommit(TSDB_REPO_T *repo); void tsdbIncCommitRef(int vgId); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 6b1c3a87f9b84aa53bd78f5ab3a17717bf4849e0..e463b1730bf0e48b46440fc908d06e78163336c3 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -19,11 +19,9 @@ #ifdef __cplusplus extern "C" { #endif - #include "trpc.h" #include "twal.h" - typedef struct { int32_t len; void * rsp; @@ -53,29 +51,35 @@ typedef struct { SWalHead pHead[]; } SVWriteMsg; +// vnodeStatus extern char *vnodeStatus[]; +// vnodeMain int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeClose(int32_t vgId); -void* vnodeAcquire(int32_t vgId); // add refcount -void vnodeRelease(void *pVnode); // dec refCount +// vnodeMgmt +int32_t vnodeInitMgmt(); +void vnodeCleanupMgmt(); +void* vnodeAcquire(int32_t vgId); +void vnodeRelease(void *pVnode); void* vnodeGetWal(void *pVnode); +int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); +void vnodeBuildStatusMsg(void *pStatus); +void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); +// vnodeWrite int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); -int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); -void vnodeBuildStatusMsg(void *pStatus); -void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); -void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); -int32_t vnodeInitResources(); -void vnodeCleanupResources(); +// vnodeSync +void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); +// vnodeRead int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index c86b8f32b7ff6bfb30e7b734c7b38504200e44e6..75a2cbcb8deb869922295e623144569e90a0cecf 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -14,6 +14,7 @@ */ #include "os.h" +#include "tglobal.h" #include "tlist.h" #include "tref.h" #include "tsdbMain.h" @@ -36,7 +37,8 @@ static void *tsdbLoopCommit(void *arg); SCommitQueue tsCommitQueue = {0}; -int tsdbInitCommitQueue(int nthreads) { +int tsdbInitCommitQueue() { + int nthreads = tsNumOfCommitThreads; SCommitQueue *pQueue = &tsCommitQueue; if (nthreads < 1) nthreads = 1; diff --git a/src/util/inc/tstep.h b/src/util/inc/tstep.h new file mode 100644 index 0000000000000000000000000000000000000000..294fa79b6040b5ca1218b2e94c6a5df1fa19b4e6 --- /dev/null +++ b/src/util/inc/tstep.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSTEP_H +#define TDENGINE_TSTEP_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + const char *const name; + int32_t (*initFp)(); + void (*cleanupFp)(); + int32_t step; +} SStep; + +int32_t taosStepInit(SStep *pSteps, int32_t stepSize); +void taosStepCleanup(SStep *pSteps, int32_t stepSize); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TUTIL_H diff --git a/src/util/src/tstep.c b/src/util/src/tstep.c new file mode 100644 index 0000000000000000000000000000000000000000..e418191c8dbd18161cd8626ec13ae54005ab8fcb --- /dev/null +++ b/src/util/src/tstep.c @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tstep.h" +#include "tulog.h" +#include "taoserror.h" + +void taosStepCleanupImp(SStep *pSteps, int32_t stepId) { + for (int32_t step = stepId; step >= 0; step--) { + SStep *pStep = pSteps + step; + uDebug("step:%s will cleanup", pStep->name); + if (pStep->cleanupFp != NULL) { + (*pStep->cleanupFp)(); + } + } +} + +int32_t taosStepInit(SStep *pSteps, int32_t stepSize) { + for (int32_t step = 0; step < stepSize; step++) { + SStep *pStep = pSteps + step; + if (pStep->initFp == NULL) continue; + + int32_t code = (*pStep->initFp)(); + if (code != 0) { + uDebug("step:%s will init", pStep->name); + taosStepCleanupImp(pSteps, step); + return code; + } + } + + return 0; +} + +void taosStepCleanup(SStep *pSteps, int32_t stepSize) { + return taosStepCleanupImp(pSteps, stepSize - 1); +} diff --git a/src/vnode/inc/vnodeCfg.h b/src/vnode/inc/vnodeCfg.h index c5887fef5d53ca32c215a5f2da9d182386ebc4f0..ba148c07c1e4f9451232c706fb79c0af0cad9746 100644 --- a/src/vnode/inc/vnodeCfg.h +++ b/src/vnode/inc/vnodeCfg.h @@ -19,6 +19,7 @@ #ifdef __cplusplus extern "C" { #endif +#include "vnodeInt.h" int32_t vnodeReadCfg(SVnodeObj *pVnode); int32_t vnodeWriteCfg(SCreateVnodeMsg *pVnodeCfg); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 7d03df5ecf57256cc2499a2130693010db36460d..401c217b9a972f0929f38b76b88aac508f106723 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -19,12 +19,11 @@ #ifdef __cplusplus extern "C" { #endif - #include "tlog.h" #include "tsync.h" -#include "twal.h" #include "tcq.h" #include "tsdb.h" +#include "vnode.h" extern int32_t vDebugFlag; @@ -36,40 +35,37 @@ extern int32_t vDebugFlag; #define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }} typedef struct { - int32_t vgId; // global vnode group ID - int32_t refCount; // reference count - int32_t queuedWMsg; - int32_t queuedRMsg; - int32_t flowctrlLevel; - int8_t status; - int8_t role; - int8_t accessState; - int8_t isFull; - int8_t isCommiting; - uint64_t version; // current version - uint64_t fversion; // version on saved data file - void *wqueue; - void *rqueue; - void *wal; - void *tsdb; - int64_t sync; - void *events; - void *cq; // continuous query - int32_t cfgVersion; - STsdbCfg tsdbCfg; - SSyncCfg syncCfg; - SWalCfg walCfg; - void *qMgmt; - char *rootDir; - tsem_t sem; - int8_t dropped; - char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; - pthread_mutex_t statusMutex; + int32_t vgId; // global vnode group ID + int32_t refCount; // reference count + int32_t queuedWMsg; + int32_t queuedRMsg; + int32_t flowctrlLevel; + int8_t status; + int8_t role; + int8_t accessState; + int8_t isFull; + int8_t isCommiting; + uint64_t version; // current version + uint64_t fversion; // version on saved data file + void * wqueue; + void * rqueue; + void * wal; + void * tsdb; + int64_t sync; + void * events; + void * cq; // continuous query + int32_t cfgVersion; + STsdbCfg tsdbCfg; + SSyncCfg syncCfg; + SWalCfg walCfg; + void * qMgmt; + char * rootDir; + tsem_t sem; + int8_t dropped; + char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; + pthread_mutex_t statusMutex; } SVnodeObj; -void vnodeInitWriteFp(void); -void vnodeInitReadFp(void); - #ifdef __cplusplus } #endif diff --git a/src/vnode/inc/vnodeMain.h b/src/vnode/inc/vnodeMain.h new file mode 100644 index 0000000000000000000000000000000000000000..7e9ccf08a09aa21adc3238c82973e39d5a3c7b59 --- /dev/null +++ b/src/vnode/inc/vnodeMain.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_MAIN_H +#define TDENGINE_VNODE_MAIN_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnodeInt.h" + +int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); +int32_t vnodeDrop(int32_t vgId); +int32_t vnodeOpen(int32_t vgId, char *rootDir); +int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); +int32_t vnodeClose(int32_t vgId); + +int32_t vnodeReset(SVnodeObj *pVnode); +void vnodeDestroy(SVnodeObj *pVnode); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/inc/vnodeMgmt.h b/src/vnode/inc/vnodeMgmt.h new file mode 100644 index 0000000000000000000000000000000000000000..5a7e7456195f2f8a21f2c99b4d50761c83729acd --- /dev/null +++ b/src/vnode/inc/vnodeMgmt.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_MGMT_H +#define TDENGINE_VNODE_MGMT_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnodeInt.h" + +int32_t vnodeInitMgmt(); +void vnodeCleanupMgmt(); + +void* vnodeAcquire(int32_t vgId); +void vnodeRelease(void *pVnode); +void* vnodeGetWal(void *pVnode); + +int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); +void vnodeBuildStatusMsg(void *pStatus); +void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); + +void vnodeAddIntoHash(SVnodeObj* pVnode); +void vnodeRemoveFromHash(SVnodeObj * pVnode); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/inc/vnodeRead.h b/src/vnode/inc/vnodeRead.h new file mode 100644 index 0000000000000000000000000000000000000000..f2953d79f4d07c3dac821e9a086d86c53647d9c7 --- /dev/null +++ b/src/vnode/inc/vnodeRead.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_READ_H +#define TDENGINE_VNODE_READ_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnodeInt.h" + +int32_t vnodeInitRead(void); +void vnodeCleanupRead(void); + +int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); +void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); +int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/inc/vnodeStatus.h b/src/vnode/inc/vnodeStatus.h index 911f48e3924d61b5dce19f34120967c5bff9e002..791af29c5f361855e799c1038cb55d4d4d630fba 100644 --- a/src/vnode/inc/vnodeStatus.h +++ b/src/vnode/inc/vnodeStatus.h @@ -19,6 +19,7 @@ #ifdef __cplusplus extern "C" { #endif +#include "vnodeInt.h" typedef enum _VN_STATUS { TAOS_VN_STATUS_INIT = 0, diff --git a/src/vnode/inc/vnodeSync.h b/src/vnode/inc/vnodeSync.h index 7189d4a57289311f2b1daa37f3fb56da3d9804eb..65e96a61311913d2beb416dda5cb38441628efbb 100644 --- a/src/vnode/inc/vnodeSync.h +++ b/src/vnode/inc/vnodeSync.h @@ -19,6 +19,7 @@ #ifdef __cplusplus extern "C" { #endif +#include "vnodeInt.h" uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver); int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); @@ -29,6 +30,8 @@ void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); +void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); + #ifdef __cplusplus } #endif diff --git a/src/vnode/inc/vnodeVersion.h b/src/vnode/inc/vnodeVersion.h index 1d086cb21fdab0247038c7f3d32d89b38c19d871..913e3915ab3b911746440be9eab2c7f2e05dce3d 100644 --- a/src/vnode/inc/vnodeVersion.h +++ b/src/vnode/inc/vnodeVersion.h @@ -19,6 +19,7 @@ #ifdef __cplusplus extern "C" { #endif +#include "vnodeInt.h" int32_t vnodeReadVersion(SVnodeObj *pVnode); int32_t vnodeSaveVersion(SVnodeObj *pVnode); diff --git a/src/vnode/inc/vnodeWrite.h b/src/vnode/inc/vnodeWrite.h new file mode 100644 index 0000000000000000000000000000000000000000..c69da3567a7025d1e041f5fa21266491f1f92a79 --- /dev/null +++ b/src/vnode/inc/vnodeWrite.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_WRITE_H +#define TDENGINE_VNODE_WRITE_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnodeInt.h" + +int32_t vnodeInitWrite(void); +void vnodeCleanupWrite(void); + +int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); +void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); +int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c index 2d56157328714a5ae4b8ac05abe3ac4468e361cb..d4aca769db002c05bf422d03f4b2d0ebac545fda 100644 --- a/src/vnode/src/vnodeCfg.c +++ b/src/vnode/src/vnodeCfg.c @@ -15,13 +15,9 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosmsg.h" -#include "taoserror.h" #include "cJSON.h" #include "tglobal.h" -#include "tsdb.h" #include "dnode.h" -#include "vnodeInt.h" #include "vnodeCfg.h" static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index ea5a655a62d5de3bed08b7b682c9eb5810d350d9..35ab13e35dd6ec954c7e9c34b6d7ef7325f4c024 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -18,56 +18,17 @@ #include "taoserror.h" #include "taosmsg.h" #include "tglobal.h" -#include "trpc.h" -#include "tutil.h" -#include "vnode.h" -#include "vnodeInt.h" +#include "tfs.h" +#include "query.h" +#include "dnode.h" #include "vnodeCfg.h" #include "vnodeStatus.h" #include "vnodeSync.h" #include "vnodeVersion.h" -#include "query.h" -#include "dnode.h" -#include "dnodeVWrite.h" -#include "dnodeVRead.h" -#include "tfs.h" - -static SHashObj*tsVnodesHash; -static void vnodeCleanUp(SVnodeObj *pVnode); -static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); - -int32_t vnodeInitResources() { - int32_t code = syncInit(); - if (code != 0) return code; - - vnodeInitWriteFp(); - vnodeInitReadFp(); +#include "vnodeMgmt.h" - tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (tsVnodesHash == NULL) { - vError("failed to init vnode list"); - return TSDB_CODE_VND_OUT_OF_MEMORY; - } - - if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) { - vError("failed to init vnode commit queue"); - return terrno; - } - - return TSDB_CODE_SUCCESS; -} - -void vnodeCleanupResources() { - tsdbDestroyCommitQueue(); - - if (tsVnodesHash != NULL) { - vDebug("vnode list is cleanup"); - taosHashCleanup(tsVnodesHash); - tsVnodesHash = NULL; - } - - syncCleanUp(); -} +static void vnodeCleanUp(SVnodeObj *pVnode); +static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { int32_t code; @@ -313,8 +274,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); tsdbIncCommitRef(pVnode->vgId); - taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); - + vnodeAddIntoHash(pVnode); + SSyncInfo syncInfo; syncInfo.vgId = pVnode->vgId; syncInfo.version = pVnode->version; @@ -353,25 +314,10 @@ int32_t vnodeClose(int32_t vgId) { return 0; } -void vnodeRelease(void *vparam) { - if (vparam == NULL) return; - SVnodeObj *pVnode = vparam; - int32_t code = 0; - int32_t vgId = pVnode->vgId; - - int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode); - assert(refCount >= 0); - - if (refCount > 0) { - if (vnodeInResetStatus(pVnode) && refCount <= 3) { - tsem_post(&pVnode->sem); - } - return; - } - - vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode); - +void vnodeDestroy(SVnodeObj *pVnode) { + int32_t code = 0; + int32_t vgId = pVnode->vgId; + if (pVnode->qMgmt) { qCleanupQueryMgmt(pVnode->qMgmt); pVnode->qMgmt = NULL; @@ -436,113 +382,11 @@ void vnodeRelease(void *vparam) { pthread_mutex_destroy(&pVnode->statusMutex); free(pVnode); tsdbDecCommitRef(vgId); - - int32_t count = taosHashGetSize(tsVnodesHash); - vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count); -} - -static void vnodeIncRef(void *ptNode) { - assert(ptNode != NULL); - - SVnodeObj **ppVnode = (SVnodeObj **)ptNode; - assert(ppVnode); - assert(*ppVnode); - - SVnodeObj *pVnode = *ppVnode; - atomic_add_fetch_32(&pVnode->refCount, 1); - vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); -} - -void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); - - if (ppVnode == NULL || *ppVnode == NULL) { - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - vDebug("vgId:%d, not exist", vgId); - return NULL; - } - - return *ppVnode; -} - -void *vnodeGetWal(void *pVnode) { - return ((SVnodeObj *)pVnode)->wal; -} - -static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { - int64_t totalStorage = 0; - int64_t compStorage = 0; - int64_t pointsWritten = 0; - - if (!vnodeInReadyStatus(pVnode)) return; - if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; - - if (pVnode->tsdb) { - tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage); - } - - SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; - pLoad->vgId = htonl(pVnode->vgId); - pLoad->cfgVersion = htonl(pVnode->cfgVersion); - pLoad->totalStorage = htobe64(totalStorage); - pLoad->compStorage = htobe64(compStorage); - pLoad->pointsWritten = htobe64(pointsWritten); - pLoad->status = pVnode->status; - pLoad->role = pVnode->role; - pLoad->replica = pVnode->syncCfg.replica; -} - -int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { - void *pIter = taosHashIterate(tsVnodesHash, NULL); - while (pIter) { - SVnodeObj **pVnode = pIter; - if (*pVnode) { - - (*numOfVnodes)++; - if (*numOfVnodes >= TSDB_MAX_VNODES) { - vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); - continue; - } else { - vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId; - } - - } - - pIter = taosHashIterate(tsVnodesHash, pIter); - } - return TSDB_CODE_SUCCESS; -} - -void vnodeBuildStatusMsg(void *param) { - SStatusMsg *pStatus = param; - - void *pIter = taosHashIterate(tsVnodesHash, NULL); - while (pIter) { - SVnodeObj **pVnode = pIter; - if (*pVnode) { - vnodeBuildVloadMsg(*pVnode, pStatus); - } - pIter = taosHashIterate(tsVnodesHash, pIter); - } -} - -void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { - for (int32_t i = 0; i < numOfVnodes; ++i) { - pAccess[i].vgId = htonl(pAccess[i].vgId); - SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId); - if (pVnode != NULL) { - pVnode->accessState = pAccess[i].accessState; - if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { - vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState); - } - vnodeRelease(pVnode); - } - } } static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed - taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t)); + vnodeRemoveFromHash(pVnode); if (!vnodeInInitStatus(pVnode)) { // it may be in updateing or reset state, then it shall wait @@ -602,8 +446,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { return 0; } - -int32_t vnodeResetTsdb(SVnodeObj *pVnode) { +int32_t vnodeReset(SVnodeObj *pVnode) { char rootDir[128] = "\0"; sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId); diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c new file mode 100644 index 0000000000000000000000000000000000000000..542ea7b2cea75ab0468da0519c81706508fffb13 --- /dev/null +++ b/src/vnode/src/vnodeMgmt.c @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tstep.h" +#include "vnodeStatus.h" +#include "vnodeRead.h" +#include "vnodeWrite.h" +#include "vnodeMain.h" + +static SHashObj *tsVnodesHash = NULL; + +static int32_t vnodeInitHash(void); +static void vnodeCleanupHash(void); +static void vnodeIncRef(void *ptNode); + +static SStep tsVnodeSteps[] = { + {"vsync", syncInit, syncCleanUp, 0}, + {"vwrite", vnodeInitWrite, vnodeCleanupWrite, 1}, + {"vread", vnodeInitRead, vnodeCleanupRead, 2}, + {"vhash", vnodeInitHash, vnodeCleanupHash, 3}, + {"vqueue", tsdbInitCommitQueue, tsdbDestroyCommitQueue, 4} +}; + +int32_t vnodeInitMgmt() { + int32_t stepSize = sizeof(tsVnodeSteps) / sizeof(SStep); + return taosStepInit(tsVnodeSteps, stepSize); +} + +void vnodeCleanupMgmt() { + int32_t stepSize = sizeof(tsVnodeSteps) / sizeof(SStep); + taosStepCleanup(tsVnodeSteps, stepSize); +} + +static int32_t vnodeInitHash() { + tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsVnodesHash == NULL) { + vError("failed to init vnode mgmt"); + return -1; + } + + return 0; +} + +static void vnodeCleanupHash() { + if (tsVnodesHash != NULL) { + vDebug("vnode mgmt is cleanup"); + taosHashCleanup(tsVnodesHash); + tsVnodesHash = NULL; + } +} + +void *vnodeGetWal(void *pVnode) { + return ((SVnodeObj *)pVnode)->wal; +} + +void vnodeAddIntoHash(SVnodeObj *pVnode) { + taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); +} + +void vnodeRemoveFromHash(SVnodeObj *pVnode) { + taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t)); +} + +static void vnodeIncRef(void *ptNode) { + assert(ptNode != NULL); + + SVnodeObj **ppVnode = (SVnodeObj **)ptNode; + assert(ppVnode); + assert(*ppVnode); + + SVnodeObj *pVnode = *ppVnode; + atomic_add_fetch_32(&pVnode->refCount, 1); + vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); +} + +void *vnodeAcquire(int32_t vgId) { + SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + + if (ppVnode == NULL || *ppVnode == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + vDebug("vgId:%d, not exist", vgId); + return NULL; + } + + return *ppVnode; +} + +void vnodeRelease(void *vparam) { + SVnodeObj *pVnode = vparam; + if (vparam == NULL) return; + + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode); + assert(refCount >= 0); + + if (refCount > 0) { + if (vnodeInResetStatus(pVnode) && refCount <= 3) { + tsem_post(&pVnode->sem); + } + } else { + vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode); + vnodeDestroy(pVnode); + int32_t count = taosHashGetSize(tsVnodesHash); + vDebug("vgId:%d, vnode is destroyed, vnodes:%d", pVnode->vgId, count); + } +} + +static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { + int64_t totalStorage = 0; + int64_t compStorage = 0; + int64_t pointsWritten = 0; + + if (!vnodeInReadyStatus(pVnode)) return; + if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; + + if (pVnode->tsdb) { + tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage); + } + + SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; + pLoad->vgId = htonl(pVnode->vgId); + pLoad->cfgVersion = htonl(pVnode->cfgVersion); + pLoad->totalStorage = htobe64(totalStorage); + pLoad->compStorage = htobe64(compStorage); + pLoad->pointsWritten = htobe64(pointsWritten); + pLoad->status = pVnode->status; + pLoad->role = pVnode->role; + pLoad->replica = pVnode->syncCfg.replica; +} + +int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { + void *pIter = taosHashIterate(tsVnodesHash, NULL); + while (pIter) { + SVnodeObj **pVnode = pIter; + if (*pVnode) { + + (*numOfVnodes)++; + if (*numOfVnodes >= TSDB_MAX_VNODES) { + vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES); + continue; + } else { + vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId; + } + + } + + pIter = taosHashIterate(tsVnodesHash, pIter); + } + return TSDB_CODE_SUCCESS; +} + +void vnodeBuildStatusMsg(void *param) { + SStatusMsg *pStatus = param; + + void *pIter = taosHashIterate(tsVnodesHash, NULL); + while (pIter) { + SVnodeObj **pVnode = pIter; + if (*pVnode) { + vnodeBuildVloadMsg(*pVnode, pStatus); + } + pIter = taosHashIterate(tsVnodesHash, pIter); + } +} + +void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { + for (int32_t i = 0; i < numOfVnodes; ++i) { + pAccess[i].vgId = htonl(pAccess[i].vgId); + SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId); + if (pVnode != NULL) { + pVnode->accessState = pAccess[i].accessState; + if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { + vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState); + } + vnodeRelease(pVnode); + } + } +} diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 3f7e54d46da251398b1fb042e04b196129097373..8638c2ea7b4684e76a367d508c68e76f8f583363 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -16,27 +16,25 @@ #define _DEFAULT_SOURCE #define _NON_BLOCKING_RETRIEVE 0 #include "os.h" -#include "tglobal.h" -#include "taoserror.h" #include "taosmsg.h" +#include "tqueue.h" #include "query.h" -#include "trpc.h" -#include "tsdb.h" -#include "vnode.h" -#include "vnodeInt.h" #include "vnodeStatus.h" -#include "tqueue.h" static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); -void vnodeInitReadFp(void) { +int32_t vnodeInitRead(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; + + return 0; } +void vnodeCleanupRead() {} + // // After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are // still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the diff --git a/src/vnode/src/vnodeStatus.c b/src/vnode/src/vnodeStatus.c index 4dce0bd9619b624799e333c8824c19960fc9f26d..d09a6a86631837db6799aca5b4df87ae22e07853 100644 --- a/src/vnode/src/vnodeStatus.c +++ b/src/vnode/src/vnodeStatus.c @@ -15,9 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taosmsg.h" -#include "vnode.h" -#include "vnodeInt.h" #include "vnodeStatus.h" char* vnodeStatus[] = { diff --git a/src/vnode/src/vnodeSync.c b/src/vnode/src/vnodeSync.c index 7858820bd814f0a535f67afa7b5f84f44c8330bf..c67132c41f2a925c5d3224f8b59f0e94b5e8f1c3 100644 --- a/src/vnode/src/vnodeSync.c +++ b/src/vnode/src/vnodeSync.c @@ -15,21 +15,11 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taoserror.h" #include "taosmsg.h" -#include "tglobal.h" -#include "trpc.h" -#include "tutil.h" -#include "vnode.h" -#include "vnodeInt.h" -#include "vnodeCfg.h" -#include "vnodeStatus.h" -#include "vnodeVersion.h" #include "query.h" #include "dnode.h" -#include "dnodeVWrite.h" -#include "dnodeVRead.h" -#include "tfs.h" +#include "vnodeVersion.h" +#include "vnodeMain.h" uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) { SVnodeObj *pVnode = vnodeAcquire(vgId); @@ -105,7 +95,7 @@ int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) { vnodeSaveVersion(pVnode); vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); - int32_t code = vnodeResetTsdb(pVnode); + int32_t code = vnodeReset(pVnode); vnodeRelease(pVnode); return code; @@ -154,3 +144,8 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { vnodeRelease(pVnode); return code; } + +void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) { + SVnodeObj *pVnode = vparam; + syncConfirmForward(pVnode->sync, version, code); +} diff --git a/src/vnode/src/vnodeVersion.c b/src/vnode/src/vnodeVersion.c index 8f6360b4f98eac8f394f5078ed2b025cea4192b0..fb3b3ebd9e9b9b4e6ac4d3f62de3b7599a13cae9 100644 --- a/src/vnode/src/vnodeVersion.c +++ b/src/vnode/src/vnodeVersion.c @@ -15,11 +15,8 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "taoserror.h" #include "cJSON.h" #include "tglobal.h" -#include "tsdb.h" -#include "vnodeInt.h" #include "vnodeVersion.h" int32_t vnodeReadVersion(SVnodeObj *pVnode) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 91fc6d399897b21c6cfa96eafd3e67333799a210..a826a4903fe6a498db0769a1a1d883791bbb3802 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -19,18 +19,9 @@ #include "taoserror.h" #include "tglobal.h" #include "tqueue.h" -#include "trpc.h" -#include "tsdb.h" -#include "twal.h" -#include "tsync.h" #include "ttimer.h" -#include "tdataformat.h" -#include "vnode.h" -#include "vnodeInt.h" -#include "vnodeStatus.h" -#include "syncInt.h" -#include "tcq.h" #include "dnode.h" +#include "vnodeStatus.h" #define MAX_QUEUED_MSG_NUM 10000 @@ -44,15 +35,19 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); -void vnodeInitWriteFp(void) { +int32_t vnodeInitWrite(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; + + return 0; } +void vnodeCleanupWrite() {} + int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t code = 0; SVnodeObj *pVnode = vparam; @@ -133,11 +128,6 @@ static int32_t vnodeCheckWrite(void *vparam) { return TSDB_CODE_SUCCESS; } -void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) { - SVnodeObj *pVnode = vparam; - syncConfirmForward(pVnode->sync, version, code); -} - static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { int32_t code = TSDB_CODE_SUCCESS;