From 90c133a12cebef01c45588cdfb5bfa124fd9245e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Dec 2020 15:10:01 +0800 Subject: [PATCH] TD-2289 --- src/vnode/inc/vnodeSync.h | 36 +++++++++ src/vnode/src/vnodeMain.c | 137 +-------------------------------- src/vnode/src/vnodeSync.c | 156 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 195 insertions(+), 134 deletions(-) create mode 100644 src/vnode/inc/vnodeSync.h create mode 100644 src/vnode/src/vnodeSync.c diff --git a/src/vnode/inc/vnodeSync.h b/src/vnode/inc/vnodeSync.h new file mode 100644 index 0000000000..7189d4a572 --- /dev/null +++ b/src/vnode/inc/vnodeSync.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_VNODE_SYNC_H +#define TDENGINE_VNODE_SYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver); +int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); +void vnodeNotifyRole(int32_t vgId, int8_t role); +void vnodeCtrlFlow(int32_t vgId, int32_t level); +int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); +void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); +int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); +int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 91b2cce564..ea5a655a62 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -24,6 +24,7 @@ #include "vnodeInt.h" #include "vnodeCfg.h" #include "vnodeStatus.h" +#include "vnodeSync.h" #include "vnodeVersion.h" #include "query.h" #include "dnode.h" @@ -34,14 +35,6 @@ static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); -static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); -static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); -static void vnodeNotifyRole(int32_t vgId, int8_t role); -static void vnodeCtrlFlow(int32_t vgId, int32_t level); -static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); -static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); -static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); -static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); int32_t vnodeInitResources() { int32_t code = syncInit(); @@ -609,70 +602,8 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { return 0; } -static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, - uint64_t *fversion) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vError("vgId:%d, vnode not found while get file info", vgId); - return 0; - } - - *fversion = pVnode->fversion; - uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); - vnodeRelease(pVnode); - return ret; -} - -static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vError("vgId:%d, vnode not found while get wal info", vgId); - return -1; - } - - int32_t code = walGetWalFile(pVnode->wal, fileName, fileId); - - vnodeRelease(pVnode); - return code; -} - -static void vnodeNotifyRole(int32_t vgId, int8_t role) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vTrace("vgId:%d, vnode not found while notify role", vgId); - return; - } - - vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]); - pVnode->role = role; - dnodeSendStatusMsgToMnode(); - - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) { - cqStart(pVnode->cq); - } else { - cqStop(pVnode->cq); - } - - vnodeRelease(pVnode); -} - -static void vnodeCtrlFlow(int32_t vgId, int32_t level) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vTrace("vgId:%d, vnode not found while flow ctrl", vgId); - return; - } - - if (pVnode->flowctrlLevel != level) { - vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level); - pVnode->flowctrlLevel = level; - } - - vnodeRelease(pVnode); -} - -static int32_t vnodeResetTsdb(SVnodeObj *pVnode) { +int32_t vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId); @@ -704,66 +635,4 @@ static int32_t vnodeResetTsdb(SVnodeObj *pVnode) { vnodeRelease(pVnode); return 0; -} - -static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vError("vgId:%d, vnode not found while notify file synced", vgId); - return 0; - } - - pVnode->fversion = fversion; - pVnode->version = fversion; - vnodeSaveVersion(pVnode); - - vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); - int32_t code = vnodeResetTsdb(pVnode); - - vnodeRelease(pVnode); - return code; -} - -void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) { - void *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vError("vgId:%d, vnode not found while confirm forward", vgId); - return; - } - - dnodeSendRpcVWriteRsp(pVnode, wparam, code); - vnodeRelease(pVnode); -} - -static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vError("vgId:%d, vnode not found while write to cache", vgId); - return TSDB_CODE_VND_INVALID_VGROUP_ID; - } - - int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam); - - vnodeRelease(pVnode); - return code; -} - -static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) { - vError("vgId:%d, vnode not found while write to cache", vgId); - return -1; - } - - int32_t code = 0; - if (pVnode->isCommiting) { - vDebug("vgId:%d, vnode is commiting while get version", vgId); - code = -1; - } else { - *fver = pVnode->fversion; - *wver = pVnode->version; - } - - vnodeRelease(pVnode); - return code; -} +} \ No newline at end of file diff --git a/src/vnode/src/vnodeSync.c b/src/vnode/src/vnodeSync.c new file mode 100644 index 0000000000..7858820bd8 --- /dev/null +++ b/src/vnode/src/vnodeSync.c @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "trpc.h" +#include "tutil.h" +#include "vnode.h" +#include "vnodeInt.h" +#include "vnodeCfg.h" +#include "vnodeStatus.h" +#include "vnodeVersion.h" +#include "query.h" +#include "dnode.h" +#include "dnodeVWrite.h" +#include "dnodeVRead.h" +#include "tfs.h" + +uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while get file info", vgId); + return 0; + } + + *fver = pVnode->fversion; + uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); + + vnodeRelease(pVnode); + return ret; +} + +int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while get wal info", vgId); + return -1; + } + + int32_t code = walGetWalFile(pVnode->wal, fileName, fileId); + + vnodeRelease(pVnode); + return code; +} + +void vnodeNotifyRole(int32_t vgId, int8_t role) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vTrace("vgId:%d, vnode not found while notify role", vgId); + return; + } + + vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]); + pVnode->role = role; + dnodeSendStatusMsgToMnode(); + + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) { + cqStart(pVnode->cq); + } else { + cqStop(pVnode->cq); + } + + vnodeRelease(pVnode); +} + +void vnodeCtrlFlow(int32_t vgId, int32_t level) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vTrace("vgId:%d, vnode not found while flow ctrl", vgId); + return; + } + + if (pVnode->flowctrlLevel != level) { + vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level); + pVnode->flowctrlLevel = level; + } + + vnodeRelease(pVnode); +} + +int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while notify file synced", vgId); + return 0; + } + + pVnode->fversion = fversion; + pVnode->version = fversion; + vnodeSaveVersion(pVnode); + + vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); + int32_t code = vnodeResetTsdb(pVnode); + + vnodeRelease(pVnode); + return code; +} + +void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) { + void *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while confirm forward", vgId); + return; + } + + dnodeSendRpcVWriteRsp(pVnode, wparam, code); + vnodeRelease(pVnode); +} + +int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while write to cache", vgId); + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } + + int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam); + + vnodeRelease(pVnode); + return code; +} + +int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while write to cache", vgId); + return -1; + } + + int32_t code = 0; + if (pVnode->isCommiting) { + vDebug("vgId:%d, vnode is commiting while get version", vgId); + code = -1; + } else { + *fver = pVnode->fversion; + *wver = pVnode->version; + } + + vnodeRelease(pVnode); + return code; +} -- GitLab