From a305bb422dcaf4b036d43c5761932bfa9ed38d1e Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 9 Mar 2020 16:26:35 +0800 Subject: [PATCH] Add code for module dnodeMgmt --- src/dnode/CMakeLists.txt | 2 + src/dnode/inc/dnodeMgmt.h | 20 +-- src/dnode/src/dnodeMgmt.c | 245 ++++++++++++++++++++++------------- src/dnode/src/dnodeMnode.c | 2 +- src/inc/taoserror.h | 1 + src/inc/taosmsg.h | 14 +- src/mnode/src/mgmtDnodeInt.c | 6 +- src/util/inc/tstatus.h | 1 + 8 files changed, 181 insertions(+), 110 deletions(-) diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 73160c074f..c27771a858 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -6,6 +6,8 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/common/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index e4f0a00664..bc0ff164a2 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -20,17 +20,17 @@ extern "C" { #endif -int dnodeInitMgmt(); -void dnodeCleanupMgmt(); -void dnodeMgmt(SRpcMsg *); +int32_t dnodeInitMgmt(); +void dnodeCleanupMgmt(); +void dnodeMgmt(void *rpcMsg); -void* dnodeGetVnode(int vgId); -int dnodeGetVnodeStatus(void *); -void* dnodeGetVnodeRworker(void *); -void* dnodeGetVnodeWworker(void *); -void* dnodeGetVnodeWal(void *); -void* dnodeGetVnodeTsdb(void *); -void dnodeReleaseVnode(void *); +void* dnodeGetVnode(int32_t vgId); +int32_t dnodeGetVnodeStatus(void *pVnode); +void* dnodeGetVnodeRworker(void *pVnode); +void* dnodeGetVnodeWworker(void *pVnode); +void* dnodeGetVnodeWal(void *pVnode); +void* dnodeGetVnodeTsdb(void *pVnode); +void dnodeReleaseVnode(void *pVnode); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 8712b3a692..25055643df 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -15,49 +15,64 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "tlog.h" +#include "ihash.h" #include "taoserror.h" #include "taosmsg.h" +#include "tlog.h" #include "trpc.h" -#include "dnodeWrite.h" -#include "dnodeRead.h" +#include "tstatus.h" +#include "tsdb.h" #include "dnodeMgmt.h" +#include "dnodeRead.h" +#include "dnodeWrite.h" typedef struct { int32_t vgId; // global vnode group ID - int status; // status: master, slave, notready, deleting - int refCount; // reference count + int32_t status; // status: master, slave, notready, deleting + int32_t refCount; // reference count int64_t version; - void *wworker; - void *rworker; - void *wal; - void *tsdb; - void *replica; - void *events; - void *cq; // continuous query + void *wworker; + void *rworker; + void *wal; + void *tsdb; + void *replica; + void *events; + void *cq; // continuous query } SVnodeObj; -static int dnodeOpenVnodes(); -static void dnodeCleanupVnodes(); -static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg); -static int dnodeDropVnode(SVnodeObj *pVnode); -static void dnodeRemoveVnode(SVnodeObj *pVnode); - -static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); -static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); -static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); -static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); +static int32_t dnodeOpenVnodes(); +static void dnodeCleanupVnodes(); +static int32_t dnodeOpenVnode(int32_t vgId); +static void dnodeCleanupVnode(SVnodeObj *pVnode); +static int32_t dnodeCreateVnode(SCreateVnodeMsg *cfg); +static void dnodeDropVnode(SVnodeObj *pVnode); +static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg); +static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg); +static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg); +static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg); + +static void * tsDnodeVnodesHash = NULL; + +int32_t dnodeInitMgmt() { + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeProcessDropVnodeMsg; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_VNODE] = dnodeProcessDropVnodeMsg; + + tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); + if (tsDnodeVnodesHash == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } -int dnodeInitMgmt() { - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessDropVnodeMsg; + return dnodeOpenVnodes(); } void dnodeCleanupMgmt() { - + dnodeCleanupVnodes(); + taosCleanUpIntHash(tsDnodeVnodesHash); } -void dnodeMgmt(SRpcMsg *pMsg) { - +void dnodeMgmt(void *rpcMsg) { + SRpcMsg *pMsg = rpcMsg; terrno = 0; if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { @@ -68,28 +83,29 @@ void dnodeMgmt(SRpcMsg *pMsg) { SRpcMsg rsp; rsp.handle = pMsg->handle; - rsp.code = terrno; - rsp.pCont = NULL; + rsp.code = terrno; + rsp.pCont = NULL; rpcSendResponse(&rsp); rpcFreeCont(pMsg->pCont); // free the received message } - -void *dnodeGetVnode(int vgId) { - SVnodeObj *pVnode; - // retrieve the pVnode from vgId +void *dnodeGetVnode(int32_t vgId) { + SVnodeObj *pVnode = taosGetIntHashData(tsDnodeVnodesHash, vgId); + if (pVnode == NULL) { + terrno = TSDB_CODE_INVALID_VGROUP_ID; + return NULL; + } - - // if (pVnode->status == ....) { - // terrno = ; - // return NULL; - // } + if (pVnode->status != TSDB_VN_STATUS_MASTER && pVnode->status == TSDB_VN_STATUS_SLAVE) { + terrno = TSDB_CODE_INVALID_VNODE_STATUS; + return NULL; + } atomic_add_fetch_32(&pVnode->refCount, 1); return pVnode; } -int dnodeGetVnodeStatus(void *pVnode) { +int32_t dnodeGetVnodeStatus(void *pVnode) { return ((SVnodeObj *)pVnode)->status; } @@ -109,61 +125,53 @@ void *dnodeGetVnodeTsdb(void *pVnode) { return ((SVnodeObj *)pVnode)->tsdb; } -void dnodeReleaseVnode(void *param) { - SVnodeObj *pVnode = (SVnodeObj *)param; - - int refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - if (refCount == 0) dnodeRemoveVnode(pVnode); -} - -static int dnodeOpenVnode() { - SVnodeObj *pVnode; - - // create tsdb - - // create wal - - // allocate write worker - pVnode->wworker = dnodeAllocateWriteWorker(); - - // create read queue - pVnode->rworker = dnodeAllocateReadWorker(); - - // create the replica - - // set the status - - pVnode->refCount = 1; - - return 0; -} - -static int dnodeOpenVnodes() { - return 0; +void dnodeReleaseVnode(void *pVnode) { + atomic_sub_fetch_32(&((SVnodeObj *) pVnode)->refCount, 1); } -static void dnodeCleanupVnode() { - +static int32_t dnodeOpenVnodes() { + dPrint("open all vnodes"); + return TSDB_CODE_SUCCESS; } static void dnodeCleanupVnodes() { - + dPrint("clean all vnodes"); } -static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg) { +static int32_t dnodeOpenVnode(int32_t vgId) { + char rootDir[TSDB_FILENAME_LEN] = {0}; + sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId); - SVnodeObj *pVnode = malloc(sizeof(SVnodeObj)); - - // save the vnode info in non-volatile storage - - // add into hash, so it can be retrieved - dnodeOpenVnode(pVnode); + void *pTsdb = tsdbOpenRepo(rootDir); + if (pTsdb != NULL) { + return terrno; + } - return 0; + SVnodeObj vnodeObj; + vnodeObj.vgId = vgId; + vnodeObj.status = TSDB_VN_STATUS_NOT_READY; + vnodeObj.refCount = 1; + vnodeObj.version = 0; + vnodeObj.wworker = dnodeAllocateWriteWorker(); + vnodeObj.rworker = dnodeAllocateReadWorker(); + vnodeObj.wal = NULL; + vnodeObj.tsdb = pTsdb; + vnodeObj.replica = NULL; + vnodeObj.events = NULL; + vnodeObj.cq = NULL; + + taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj); + + return TSDB_CODE_SUCCESS; } -static void dnodeRemoveVnode(SVnodeObj *pVnode) { - +static void dnodeCleanupVnode(SVnodeObj *pVnode) { + pVnode->status = TSDB_VN_STATUS_NOT_READY; + int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); + if (count > 0) { + // wait refcount + } + // remove replica // remove read queue @@ -171,26 +179,79 @@ static void dnodeRemoveVnode(SVnodeObj *pVnode) { // remove write queue dnodeFreeWriteWorker(pVnode->wworker); - + // remove wal // remove tsdb + if (pVnode->tsdb) { + tsdbCloseRepo(pVnode->tsdb); + } + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); } -static int dnodeDropVnode(SVnodeObj *pVnode) { - - int count = atomic_sub_fetch_32(&pVnode->refCount, 1); +static int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnodeCfg) { + pVnodeCfg->vnode = htonl(pVnodeCfg->vnode); + pVnodeCfg->cfg.vgId = htonl(pVnodeCfg->cfg.vgId); + pVnodeCfg->cfg.maxSessions = htonl(pVnodeCfg->cfg.maxSessions); + pVnodeCfg->cfg.daysPerFile = htonl(pVnodeCfg->cfg.daysPerFile); + + STsdbCfg tsdbCfg; + tsdbCfg.precision = pVnodeCfg->cfg.precision; + tsdbCfg.tsdbId = pVnodeCfg->vnode; + tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; + tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; + tsdbCfg.minRowsPerFileBlock = -1; + tsdbCfg.maxRowsPerFileBlock = -1; + tsdbCfg.keep = -1; + tsdbCfg.maxCacheSize = -1; + + char rootDir[TSDB_FILENAME_LEN] = {0}; + sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId); + + void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); + if (pTsdb != NULL) { + return terrno; + } - if (count<=0) dnodeRemoveVnode(pVnode); + SVnodeObj vnodeObj; + vnodeObj.vgId = pVnodeCfg->cfg.vgId; + vnodeObj.status = TSDB_VN_STATUS_NOT_READY; + vnodeObj.refCount = 1; + vnodeObj.version = 0; + vnodeObj.wworker = dnodeAllocateWriteWorker(); + vnodeObj.rworker = dnodeAllocateReadWorker(); + vnodeObj.wal = NULL; + vnodeObj.tsdb = pTsdb; + vnodeObj.replica = NULL; + vnodeObj.events = NULL; + vnodeObj.cq = NULL; + + taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj); + + return TSDB_CODE_SUCCESS; +} + +static void dnodeDropVnode(SVnodeObj *pVnode) { + pVnode->status = TSDB_VN_STATUS_NOT_READY; + + int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1); + if (count > 0) { + // wait refcount + } + + if (pVnode->tsdb) { + tsdbDropRepo(pVnode->tsdb); + pVnode->tsdb = NULL; + } - return 0; + dnodeCleanupVnode(pVnode); } static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) { // SVnodeObj *pVnode; -// int vgId; +// int32_t vgId; // SVPeersMsg *pCfg; // check everything, if not ok, set terrno; @@ -206,7 +267,7 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) { static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) { SVnodeObj *pVnode; - int vgId; + int32_t vgId; // check everything, if not ok, set terrno; @@ -220,7 +281,7 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) { static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) { SVnodeObj *pVnode; - int vgId; + int32_t vgId; // check everything, if not ok, set terrno; diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeMnode.c index 1b51c70cfc..62814fac17 100644 --- a/src/dnode/src/dnodeMnode.c +++ b/src/dnode/src/dnodeMnode.c @@ -28,7 +28,7 @@ static void *tsDnodeMnodeRpc = NULL; int32_t dnodeInitMnode() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeWrite; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeWrite; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeMgmt; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeMgmt; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index d2186dd3a9..c5752c65c8 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -166,6 +166,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG_VERSION, 0, 122, "invalid version of message") TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 123, "dnode not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VGROUP_ID, 0, 124, "invalid vgroup id") #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index bf03187010..b9de3ec0b9 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -41,10 +41,17 @@ extern "C" { #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12 + +// dnodeMgmt #define TSDB_MSG_TYPE_CREATE_VNODE 13 #define TSDB_MSG_TYPE_CREATE_VNODE_RSP 14 -#define TSDB_MSG_TYPE_FREE_VNODE 15 -#define TSDB_MSG_TYPE_FREE_VNODE_RSP 16 +#define TSDB_MSG_TYPE_DROP_VNODE 15 +#define TSDB_MSG_TYPE_DROP_VNODE_RSP 16 +#define TSDB_MSG_TYPE_ALTER_VNODE 17 +#define TSDB_MSG_TYPE_ALTER_VNODE_RSP 18 +#define TSDB_MSG_TYPE_CONFIG_VNODE 19 +#define TSDB_MSG_TYPE_CONFIG_VNODE_RSP 20 + #define TSDB_MSG_TYPE_DNODE_CFG 17 #define TSDB_MSG_TYPE_DNODE_CFG_RSP 18 #define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19 @@ -93,8 +100,7 @@ extern "C" { #define TSDB_MSG_TYPE_DROP_TABLE_RSP 66 #define TSDB_MSG_TYPE_ALTER_TABLE 67 #define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68 -#define TSDB_MSG_TYPE_VNODE_CFG 69 -#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70 + #define TSDB_MSG_TYPE_TABLE_CFG 71 #define TSDB_MSG_TYPE_TABLE_CFG_RSP 72 #define TSDB_MSG_TYPE_TABLE_META 73 diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index a7555c8b6e..238eca25f8 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -259,7 +259,7 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn); - } else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) { + } else if (msgType == TSDB_MSG_TYPE_CONFIG_VNODE) { mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn); } else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP) { mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code); @@ -267,7 +267,7 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_CREATE_VNODE_RSP) { mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code); - } else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) { + } else if (msgType == TSDB_MSG_TYPE_DROP_VNODE_RSP) { mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DROP_STABLE) { mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); @@ -294,7 +294,7 @@ void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); if (pFreeVnode != NULL) { pFreeVnode->vnode = htonl(vnode); - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DROP_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle); } } diff --git a/src/util/inc/tstatus.h b/src/util/inc/tstatus.h index 03a882e283..74685737ce 100644 --- a/src/util/inc/tstatus.h +++ b/src/util/inc/tstatus.h @@ -48,6 +48,7 @@ typedef enum _TSDB_VN_STATUS { TSDB_VN_STATUS_MASTER, TSDB_VN_STATUS_CLOSING, TSDB_VN_STATUS_DELETING, + TSDB_VN_STATUS_NOT_READY } EVnodeStatus; enum _TSDB_VN_SYNC_STATUS { -- GitLab