From 0e7ba80d9450315ed556b8a36805aaf67eb45fca Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 25 Oct 2021 18:38:31 +0800 Subject: [PATCH] TD-10432 vnodeint.h --- source/server/vnode/CMakeLists.txt | 1 + source/server/vnode/inc/vnodeInt.h | 78 ++++++++++---------------- source/server/vnode/src/vnodeInt.c | 15 +++-- source/server/vnode/src/vnodeMain.c | 50 +++++++++-------- source/server/vnode/src/vnodeRead.c | 4 +- source/server/vnode/src/vnodeReadMsg.c | 10 ++-- source/server/vnode/src/vnodeVersion.c | 7 ++- source/server/vnode/src/vnodeWrite.c | 6 +- 8 files changed, 82 insertions(+), 89 deletions(-) diff --git a/source/server/vnode/CMakeLists.txt b/source/server/vnode/CMakeLists.txt index 249f56657c..573cea79b5 100644 --- a/source/server/vnode/CMakeLists.txt +++ b/source/server/vnode/CMakeLists.txt @@ -16,6 +16,7 @@ target_link_libraries( PUBLIC tq PUBLIC tsdb PUBLIC wal + PUBLIC sync PUBLIC cjson ) diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index b2512b2892..d810f3409c 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -16,18 +16,19 @@ #ifndef _TD_VNODE_INT_H_ #define _TD_VNODE_INT_H_ -#include "os.h" #include "amalloc.h" #include "meta.h" +#include "os.h" +#include "sync.h" #include "taosmsg.h" +#include "tlog.h" #include "tq.h" +#include "tqueue.h" #include "trpc.h" #include "tsdb.h" +#include "tworker.h" #include "vnode.h" -#include "tlog.h" -#include "tqueue.h" #include "wal.h" -#include "tworker.h" #ifdef __cplusplus extern "C" { @@ -43,51 +44,28 @@ extern int32_t vDebugFlag; #define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }} typedef struct { - SMeta * pMeta; - STsdb * pTsdb; - STQ * pTQ; - SMemAllocator *allocator; - - int32_t vgId; // global vnode group ID - int32_t refCount; // reference count - int64_t queuedWMsgSize; - int32_t queuedWMsg; - int32_t queuedRMsg; - int32_t numOfExistQHandle; // current initialized and existed query handle in current dnode - int32_t flowctrlLevel; - int8_t preClose; // drop and close switch - int8_t reserved[3]; - int64_t sequence; // for topic - int8_t status; - int8_t role; - int8_t accessState; - int8_t isFull; - int8_t isCommiting; - int8_t dbReplica; - int8_t dropped; - int8_t dbType; - uint64_t version; // current version - uint64_t cversion; // version while commit start - uint64_t fversion; // version on saved data file - void * wqueue; // write queue - void * qqueue; // read query queue - void * fqueue; // read fetch/cancel queue - void * wal; - void * tsdb; - int64_t sync; - void * events; - void * cq; // continuous query - int32_t dbCfgVersion; - int32_t vgCfgVersion; - // STsdbCfg tsdbCfg; -#if 0 - SSyncCfg syncCfg; -#endif - SWalCfg walCfg; - void * qMgmt; - char * rootDir; - tsem_t sem; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + int32_t vgId; // global vnode group ID + int32_t refCount; // reference count + SMemAllocator *allocator; + SMeta *pMeta; + STsdb *pTsdb; + STQ *pTQ; + twalh pWal; + SyncNodeId syncNode; + taos_queue pWriteQ; // write queue + taos_queue pQueryQ; // read query queue + taos_queue pFetchQ; // read fetch/cancel queue + SWalCfg walCfg; + SSyncCluster syncCfg; + char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; + int64_t queuedWMsgSize; + int32_t queuedWMsg; + int32_t queuedRMsg; + int32_t numOfQHandle; // current initialized and existed query handle in current dnode + int8_t status; + int8_t role; + int8_t accessState; + int8_t dropped; pthread_mutex_t statusMutex; } SVnode; @@ -97,6 +75,8 @@ typedef struct { void * qhandle; // used by query and retrieve msg } SVnRsp; +void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); +void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); #ifdef __cplusplus diff --git a/source/server/vnode/src/vnodeInt.c b/source/server/vnode/src/vnodeInt.c index a10c35fd98..dfb0ff0f05 100644 --- a/source/server/vnode/src/vnodeInt.c +++ b/source/server/vnode/src/vnodeInt.c @@ -23,8 +23,8 @@ #include "vnodeWrite.h" static struct { - struct SSteps *steps; - SVnodeFp fp; + SSteps *steps; + SVnodeFp fp; } tsVint; int32_t vnodeInit(SVnodePara para) { @@ -34,11 +34,10 @@ int32_t vnodeInit(SVnodePara para) { if (steps == NULL) return -1; taosStepAdd(steps, "vnode-main", vnodeInitMain, vnodeCleanupMain); - taosStepAdd(steps, "vnode-worker",vnodeInitWorker, vnodeCleanupWorker); + taosStepAdd(steps, "vnode-worker", vnodeInitWorker, vnodeCleanupWorker); taosStepAdd(steps, "vnode-read", vnodeInitRead, vnodeCleanupRead); taosStepAdd(steps, "vnode-mgmt", vnodeInitMgmt, vnodeCleanupMgmt); taosStepAdd(steps, "vnode-write", vnodeInitWrite, vnodeCleanupWrite); - // taosStepAdd(steps, "vnode-queue", tsdbInitCommitQueue, tsdbDestroyCommitQueue); tsVint.steps = steps; return taosStepExec(tsVint.steps); @@ -48,4 +47,10 @@ void vnodeCleanup() { taosStepCleanup(tsVint.steps); } void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) { return (*tsVint.fp.GetDnodeEp)(dnodeId, ep, fqdn, port); -} \ No newline at end of file +} + +void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { + (*tsVint.fp.SendMsgToDnode)(epSet, rpcMsg); +} + +void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsVint.fp.SendMsgToMnode)(rpcMsg); } diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index da1c1d7235..7d8608f0d9 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -75,15 +75,16 @@ SVnode *vnodeAcquire(int32_t vgId) { } SVnode *vnodeAcquireNotClose(int32_t vgId) { - SVnode *pVnode = vnodeAcquire(vgId); - if (pVnode != NULL && pVnode->preClose == 1) { - vnodeRelease(pVnode); - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - vDebug("vgId:%d, not exist, pre closing", vgId); - return NULL; - } - - return pVnode; + // SVnode *pVnode = vnodeAcquire(vgId); + // if (pVnode != NULL && pVnode->preClose == 1) { + // vnodeRelease(pVnode); + // terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + // vDebug("vgId:%d, not exist, pre closing", vgId); + // return NULL; + // } + + // return pVnode; + return NULL; } void vnodeRelease(SVnode *pVnode) { @@ -287,6 +288,7 @@ static int32_t vnodeAlterImp(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg) { } int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg) { +#if 0 vDebug("vgId:%d, current dbCfgVersion:%d vgCfgVersion:%d, input dbCfgVersion:%d vgCfgVersion:%d", pVnode->vgId, pVnode->dbCfgVersion, pVnode->vgCfgVersion, pVnodeCfg->cfg.dbCfgVersion, pVnodeCfg->cfg.vgCfgVersion); @@ -304,6 +306,8 @@ int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg) { } return code; +#endif + return 0; } static void vnodeFindWalRootDir(int32_t vgId, char *walRootDir) { @@ -371,10 +375,10 @@ int32_t vnodeOpen(int32_t vgId) { pVnode->fversion = pVnode->version; - pVnode->wqueue = vnodeAllocWriteQueue(pVnode); - pVnode->qqueue = vnodeAllocQueryQueue(pVnode); - pVnode->fqueue = vnodeAllocFetchQueue(pVnode); - if (pVnode->wqueue == NULL || pVnode->qqueue == NULL || pVnode->fqueue == NULL) { + pVnode->pWriteQ = vnodeAllocWriteQueue(pVnode); + pVnode->pQueryQ = vnodeAllocQueryQueue(pVnode); + pVnode->pFetchQ = vnodeAllocFetchQueue(pVnode); + if (pVnode->pWriteQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) { vnodeCleanUp(pVnode); return terrno; } @@ -467,7 +471,7 @@ int32_t vnodeClose(int32_t vgId) { return 0; } - pVnode->preClose = 1; + // pVnode->preClose = 1; vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); vnodeRelease(pVnode); @@ -516,19 +520,19 @@ void vnodeDestroy(SVnode *pVnode) { pVnode->wal = NULL; } - if (pVnode->wqueue) { - vnodeFreeWriteQueue(pVnode->wqueue); - pVnode->wqueue = NULL; + if (pVnode->pWriteQ) { + vnodeFreeWriteQueue(pVnode->pWriteQ); + pVnode->pWriteQ = NULL; } - if (pVnode->qqueue) { - vnodeFreeQueryQueue(pVnode->qqueue); - pVnode->qqueue = NULL; + if (pVnode->pQueryQ) { + vnodeFreeQueryQueue(pVnode->pQueryQ); + pVnode->pQueryQ = NULL; } - if (pVnode->fqueue) { - vnodeFreeFetchQueue(pVnode->fqueue); - pVnode->fqueue = NULL; + if (pVnode->pFetchQ) { + vnodeFreeFetchQueue(pVnode->pFetchQ); + pVnode->pFetchQ = NULL; } tfree(pVnode->rootDir); diff --git a/source/server/vnode/src/vnodeRead.c b/source/server/vnode/src/vnodeRead.c index 0bf907c419..dac378ca30 100644 --- a/source/server/vnode/src/vnodeRead.c +++ b/source/server/vnode/src/vnodeRead.c @@ -81,9 +81,9 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen, atomic_add_fetch_32(&pVnode->queuedRMsg, 1); if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { - return taosWriteQitem(pVnode->fqueue, qtype, pRead); + return taosWriteQitem(pVnode->pFetchQ, qtype, pRead); } else { - return taosWriteQitem(pVnode->qqueue, qtype, pRead); + return taosWriteQitem(pVnode->pQueryQ, qtype, pRead); } } diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index b4070546c7..419a1f73b6 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -158,7 +158,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { } } - int32_t remain = atomic_add_fetch_32(&pVnode->numOfExistQHandle, 1); + int32_t remain = atomic_add_fetch_32(&pVnode->numOfQHandle, 1); vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain); } else { assert(pCont != NULL); @@ -203,7 +203,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle if (freehandle || (!buildRes)) { if (freehandle) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); + int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1); vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain); } @@ -282,7 +282,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { // kill current query and free corresponding resources. if (pRetrieve->free == 1) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); + int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1); vWarn("vgId:%d, QInfo:%" PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d", pVnode->vgId, pRetrieve->qId, *handle, remain); @@ -296,7 +296,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { // register the qhandle to connect to quit query immediate if connection is broken if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); + int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1); vError("vgId:%d, QInfo:%" PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle, remain); @@ -333,7 +333,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { // If qhandle is not added into vread queue, the query should be completed already or paused with error. // Here free qhandle immediately if (freeHandle) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); + int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1); vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); } diff --git a/source/server/vnode/src/vnodeVersion.c b/source/server/vnode/src/vnodeVersion.c index c330533885..2b3d1b3a00 100644 --- a/source/server/vnode/src/vnodeVersion.c +++ b/source/server/vnode/src/vnodeVersion.c @@ -58,10 +58,12 @@ int32_t vnodeReadVersion(SVnode *pVnode) { vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file); goto PARSE_VER_ERROR; } +#if 0 pVnode->version = (uint64_t)ver->valueint; terrno = TSDB_CODE_SUCCESS; vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version); +#endif PARSE_VER_ERROR: if (content != NULL) free(content); @@ -85,16 +87,17 @@ int32_t vnodeSaveVersion(SVnode *pVnode) { int32_t maxLen = 100; char * content = calloc(1, maxLen + 1); +#if 0 len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion); len += snprintf(content + len, maxLen - len, "}\n"); - +#endif fwrite(content, 1, len, fp); taosFsyncFile(fileno(fp)); fclose(fp); free(content); terrno = 0; - vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion); + // vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index c103460241..c969ac0572 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -96,7 +96,7 @@ static int32_t vnodeWriteToWQueue(SVnode *pVnode, SWalHead *pHead, int32_t qtype atomic_add_fetch_32(&tsVwrite.queuedMsgs, 1); atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->queuedWMsg, 1); - taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); + taosWriteQitem(pVnode->pWriteQ, pWrite->qtype, pWrite); return TSDB_CODE_SUCCESS; } @@ -153,10 +153,10 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t #if 0 pWrite->code = walWrite(pVnode->wal, pHead); if (pWrite->code < 0) return false; -#endif - pVnode->version = pHead->version; + pVnode->version = pHead->version; +#endif // write data locally switch (msgType) { case TSDB_MSG_TYPE_SUBMIT: -- GitLab