From bb887daf8142274e56eeef68489acb694b72c832 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 30 Nov 2021 19:42:51 +0800 Subject: [PATCH] TD-10431 handle status msg --- include/common/taosmsg.h | 11 +- include/dnode/mgmt/dnode.h | 106 ++---------- include/dnode/mnode/mnode.h | 127 ++------------ source/dnode/mgmt/daemon/src/daemon.c | 6 +- source/dnode/mgmt/impl/src/dndDnode.c | 16 +- source/dnode/mgmt/impl/src/dndMnode.c | 6 + source/dnode/mnode/impl/CMakeLists.txt | 1 + source/dnode/mnode/impl/inc/mndDef.h | 17 +- source/dnode/mnode/impl/inc/mndDnode.h | 6 +- source/dnode/mnode/impl/inc/mndInt.h | 6 + source/dnode/mnode/impl/inc/mndMnode.h | 3 +- source/dnode/mnode/impl/src/mndDnode.c | 229 ++++++++++++++++++++++++- source/dnode/mnode/impl/src/mndMnode.c | 121 ++++++++++++- source/dnode/mnode/impl/src/mnode.c | 16 +- 14 files changed, 429 insertions(+), 242 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e70a8539c3..b655016f3c 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -63,6 +63,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_USER, "drop-user" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DNODE, "create-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DNODE, "drop-dnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE, "create-mnode" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE, "drop-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DB, "create-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DB, "drop-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_USE_DB, "use-db" ) @@ -631,7 +633,7 @@ typedef struct { typedef struct { int32_t statusInterval; - int8_t reserved[4]; + int32_t mnodeEqualVnodeNum; int64_t checkTime; // 1970-01-01 00:00:00.000 char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone char locale[TSDB_LOCALE_LEN]; // tsLocale @@ -654,11 +656,14 @@ typedef struct { } SVnodeLoads; typedef struct SStatusMsg { - uint32_t sversion; + int32_t sver; int32_t dnodeId; int64_t clusterId; uint32_t rebootTime; // time stamp for last reboot - int32_t numOfCores; + int16_t numOfCores; + int16_t numOfSupportMnodes; + int16_t numOfSupportVnodes; + int16_t numOfSupportQnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; SVnodeLoads vnodeLoads; diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index fe9560d427..50886932ce 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -26,95 +26,25 @@ extern "C" { typedef struct SDnode SDnode; typedef struct { - /** - * @brief software version of the program. - * - */ - int32_t sver; - - /** - * @brief num of CPU cores. - * - */ - int32_t numOfCores; - - /** - * @brief number of threads per CPU core. - * - */ - float numOfThreadsPerCore; - - /** - * @brief the proportion of total CPU cores available for query processing. - * - */ - float ratioOfQueryCores; - - /** - * @brief max number of connections allowed in dnode. - * - */ - int32_t maxShellConns; - - /** - * @brief time interval of heart beat from shell to dnode, seconds. - * - */ - int32_t shellActivityTimer; - - /** - * @brief time interval of dnode status reporting to mnode, seconds, for cluster only. - * - */ - int32_t statusInterval; - - /** - * @brief first port number for the connection (12 continuous UDP/TCP port number are used). - * - */ + int32_t sver; + int16_t numOfCores; + int16_t numOfSupportMnodes; + int16_t numOfSupportVnodes; + int16_t numOfSupportQnodes; + int32_t statusInterval; + int32_t mnodeEqualVnodeNum; + float numOfThreadsPerCore; + float ratioOfQueryCores; + int32_t maxShellConns; + int32_t shellActivityTimer; uint16_t serverPort; - - /** - * @brief data file's directory. - * - */ - char dataDir[TSDB_FILENAME_LEN]; - - /** - * @brief local endpoint. - * - */ - char localEp[TSDB_EP_LEN]; - - /** - * @brieflocal fully qualified domain name (FQDN). - * - */ - char localFqdn[TSDB_FQDN_LEN]; - - /** - * @brief first fully qualified domain name (FQDN) for TDengine system. - * - */ - char firstEp[TSDB_EP_LEN]; - - /** - * @brief system time zone. - * - */ - char timezone[TSDB_TIMEZONE_LEN]; - - /** - * @brief system locale. - * - */ - char locale[TSDB_LOCALE_LEN]; - - /** - * @briefdefault system charset. - * - */ - char charset[TSDB_LOCALE_LEN]; + char dataDir[TSDB_FILENAME_LEN]; + char localEp[TSDB_EP_LEN]; + char localFqdn[TSDB_FQDN_LEN]; + char firstEp[TSDB_EP_LEN]; + char timezone[TSDB_TIMEZONE_LEN]; + char locale[TSDB_LOCALE_LEN]; + char charset[TSDB_LOCALE_LEN]; } SDnodeOpt; /* ------------------------ SDnode ------------------------ */ diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 5066b881b5..a8a8117886 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -30,133 +30,36 @@ typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg); typedef struct SMnodeLoad { - /** - * @brief the number of dnodes in cluster. - * - */ int64_t numOfDnode; - - /** - * @brief the number of mnodes in cluster. - * - */ int64_t numOfMnode; - - /** - * @brief the number of vgroups in cluster. - * - */ int64_t numOfVgroup; - - /** - * @brief the number of databases in cluster. - * - */ int64_t numOfDatabase; - - /** - * @brief the number of super tables in cluster. - * - */ int64_t numOfSuperTable; - - /** - * @brief the number of child tables in cluster. - * - */ int64_t numOfChildTable; - - /** - * @brief the number of normal tables in cluster. - * - */ int64_t numOfNormalTable; - - /** - * @brief the number of numOfTimeseries in cluster. - * - */ int64_t numOfColumn; - - /** - * @brief total points written in cluster. - * - */ int64_t totalPoints; - - /** - * @brief total storage in cluster. - * - */ int64_t totalStorage; - - /** - * @brief total compressed storage in cluster. - * - */ int64_t compStorage; } SMnodeLoad; typedef struct { - /** - * @brief dnodeId of this mnode. - * - */ - int32_t dnodeId; - - /** - * @brief clusterId of this mnode. - * - */ - int64_t clusterId; - - /** - * @brief replica num of this mnode. - * - */ - int8_t replica; - - /** - * @brief self index in the array of replicas. - * - */ - int8_t selfIndex; - - /** - * @brief detail replica information of this mnode. - * - */ - SReplica replicas[TSDB_MAX_REPLICA]; - - /** - * @brief the parent dnode of this mnode. - * - */ - SDnode *pDnode; - - /** - * @brief put apply msg to the write queue in dnode. - * - */ - PutMsgToMnodeQFp putMsgToApplyMsgFp; - - /** - * @brief the callback function while send msg to dnode. - * - */ - SendMsgToDnodeFp sendMsgToDnodeFp; - - /** - * @brief the callback function while send msg to mnode. - * - */ - SendMsgToMnodeFp sendMsgToMnodeFp; - - /** - * @brief the callback function while send redirect msg to clients or peers. - * - */ + int32_t dnodeId; + int64_t clusterId; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + SDnode *pDnode; + PutMsgToMnodeQFp putMsgToApplyMsgFp; + SendMsgToDnodeFp sendMsgToDnodeFp; + SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; + int32_t sver; + int32_t statusInterval; + int32_t mnodeEqualVnodeNum; + char *timezone; + char *locale; + char *charset; } SMnodeOpt; /* ------------------------ SMnode ------------------------ */ diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index a0ca0dd390..08624f20f4 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -138,11 +138,15 @@ void dmnWaitSignal() { void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = tsVersion; pOption->numOfCores = tsNumOfCores; + pOption->numOfSupportMnodes = 1; + pOption->numOfSupportVnodes = 1; + pOption->numOfSupportQnodes = 1; + pOption->statusInterval = tsStatusInterval; + pOption->mnodeEqualVnodeNum = tsMnodeEqualVnodeNum; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->ratioOfQueryCores = tsRatioOfQueryCores; pOption->maxShellConns = tsMaxShellConns; pOption->shellActivityTimer = tsShellActivityTimer; - pOption->statusInterval = tsStatusInterval; pOption->serverPort = tsServerPort; tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN); tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN); diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 4265c8a3cd..7b8afa96bb 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -348,19 +348,25 @@ static void dndSendStatusMsg(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - pStatus->sversion = htonl(pDnode->opt.sver); + pStatus->sver = htonl(pDnode->opt.sver); pStatus->dnodeId = htonl(pMgmt->dnodeId); pStatus->clusterId = htobe64(pMgmt->clusterId); pStatus->rebootTime = htonl(pMgmt->rebootTime); - pStatus->numOfCores = htonl(pDnode->opt.numOfCores); + pStatus->numOfCores = htons(pDnode->opt.numOfCores); + pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores); + pStatus->numOfSupportVnodes = htons(pDnode->opt.numOfCores); + pStatus->numOfSupportQnodes = htons(pDnode->opt.numOfCores); tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN); + pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval); - tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); - tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN); + pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pDnode->opt.mnodeEqualVnodeNum); pStatus->clusterCfg.checkTime = 0; char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime); + tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN); + tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN); + tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN); taosRUnLockLatch(&pMgmt->latch); dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index ca48642899..342d14463a 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -332,6 +332,12 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); + pOption->sver = pDnode->opt.sver; + pOption->statusInterval = pDnode->opt.statusInterval; + pOption->mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum; + pOption->timezone = pDnode->opt.timezone; + pOption->charset = pDnode->opt.charset; + pOption->locale = pDnode->opt.locale; } static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 6fecb457c0..98c604e520 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -10,4 +10,5 @@ target_link_libraries( PRIVATE sdb PRIVATE transport PRIVATE cjson + PRIVATE sync ) \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8bd3bd3d34..ded66edaa7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -24,6 +24,7 @@ #include "thash.h" #include "cJSON.h" #include "mnode.h" +#include "sync.h" #ifdef __cplusplus extern "C" { @@ -89,24 +90,14 @@ typedef enum { DND_REASON_ONLINE = 0, DND_REASON_STATUS_MSG_TIMEOUT, DND_REASON_STATUS_NOT_RECEIVED, - DND_REASON_RESET_BY_MNODE, DND_REASON_VERSION_NOT_MATCH, DND_REASON_DNODE_ID_NOT_MATCH, DND_REASON_CLUSTER_ID_NOT_MATCH, - DND_REASON_NUM_OF_MNODES_NOT_MATCH, - DND_REASON_ENABLE_BALANCE_NOT_MATCH, DND_REASON_MN_EQUAL_VN_NOT_MATCH, - DND_REASON_OFFLINE_THRESHOLD_NOT_MATCH, DND_REASON_STATUS_INTERVAL_NOT_MATCH, - DND_REASON_MAX_TAB_PER_VN_NOT_MATCH, - DND_REASON_MAX_VG_PER_DB_NOT_MATCH, - DND_REASON_ARBITRATOR_NOT_MATCH, DND_REASON_TIME_ZONE_NOT_MATCH, DND_REASON_LOCALE_NOT_MATCH, DND_REASON_CHARSET_NOT_MATCH, - DND_REASON_FLOW_CTRL_NOT_MATCH, - DND_REASON_SLAVE_QUERY_NOT_MATCH, - DND_REASON_ADJUST_MASTER_NOT_MATCH, DND_REASON_OTHERS } EDndReason; @@ -135,13 +126,14 @@ typedef struct SDnodeObj { int64_t createdTime; int64_t updateTime; int64_t rebootTime; - int64_t lastAccessTime; + int32_t accessTimes; int16_t numOfMnodes; int16_t numOfVnodes; int16_t numOfQnodes; int16_t numOfSupportMnodes; int16_t numOfSupportVnodes; int16_t numOfSupportQnodes; + int16_t numOfCores; EDndStatus status; EDndReason offlineReason; uint16_t port; @@ -153,8 +145,7 @@ typedef struct SMnodeObj { int32_t id; int64_t createdTime; int64_t updateTime; - int8_t status; - int8_t role; + ESyncState role; int32_t roleTerm; int64_t roleTime; SDnodeObj *pDnode; diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index d7bfdba122..9bb1ab7acd 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -22,8 +22,10 @@ extern "C" { #endif -int32_t mndInitDnode(SMnode *pMnode); -void mndCleanupDnode(SMnode *pMnode); +int32_t mndInitDnode(SMnode *pMnode); +void mndCleanupDnode(SMnode *pMnode); +SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); +void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 2c7e597774..bb3f0ca263 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -50,6 +50,12 @@ typedef struct SMnode { SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp; + int32_t sver; + int32_t statusInterval; + int32_t mnodeEqualVnodeNum; + char *timezone; + char *locale; + char *charset; } SMnode; tmr_h mndGetTimer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index c57e1d42a5..53f7b733f2 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -24,8 +24,7 @@ extern "C" { int32_t mndInitMnode(SMnode *pMnode); void mndCleanupMnode(SMnode *pMnode); -void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect); -void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect); +bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d9e13178cc..067faba558 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -14,12 +14,30 @@ */ #define _DEFAULT_SOURCE +#include "mndDnode.h" +#include "mndMnode.h" #include "mndTrans.h" +#include "ttime.h" #define SDB_DNODE_VER 1 +static char *offlineReason[] = { + "", + "status msg timeout", + "status not received", + "version not match", + "dnodeId not match", + "clusterId not match", + "mnEqualVn not match", + "interval not match", + "timezone not match", + "locale not match", + "charset not match", + "unknown", +}; + static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_DNODE_VER, sizeof(SDnodeObj)); + SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, SDB_DNODE_VER, sizeof(SDnodeObj)); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -59,7 +77,8 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { static void mnodeResetDnode(SDnodeObj *pDnode) { pDnode->rebootTime = 0; - pDnode->lastAccessTime = 0; + pDnode->accessTimes = 0; + pDnode->numOfCores = 0; pDnode->numOfMnodes = 0; pDnode->numOfVnodes = 0; pDnode->numOfQnodes = 0; @@ -67,7 +86,7 @@ static void mnodeResetDnode(SDnodeObj *pDnode) { pDnode->numOfSupportVnodes = 0; pDnode->numOfSupportQnodes = 0; pDnode->status = DND_STATUS_OFFLINE; - pDnode->offlineReason = DND_REASON_RESET_BY_MNODE; + pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED; snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port); } @@ -89,7 +108,7 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj static int32_t mndCreateDefaultDnode(SMnode *pMnode) { SDnodeObj dnodeObj = {0}; - dnodeObj.id = 0; + dnodeObj.id = 1; dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.updateTime = dnodeObj.createdTime; dnodeObj.port = pMnode->replicas[0].port; @@ -102,6 +121,191 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { return sdbWrite(pMnode->pSdb, pRaw); } +static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) { + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + while (1) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + + if (strncasecmp(pEpStr, pDnode->ep, TSDB_EP_LEN) == 0) { + sdbCancelFetch(pSdb, pIter); + return pDnode; + } + } + + return NULL; +} + +static int32_t mndGetDnodeSize(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + return sdbGetSize(pSdb, SDB_DNODE); +} + +static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { + SSdb *pSdb = pMnode->pSdb; + + int32_t i = 0; + void *pIter = NULL; + while (1) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; + if (i >= numOfEps) { + sdbCancelFetch(pSdb, pIter); + break; + } + + SDnodeEp *pEp = &pEps->eps[i]; + pEp->id = htonl(pDnode->id); + pEp->port = htons(pDnode->port); + memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEp->isMnode = 0; + if (mndIsMnode(pMnode, pDnode->id)) { + pEp->isMnode = 1; + } + i++; + } + + pEps->num = i; +} + +static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { + if (pCfg->mnodeEqualVnodeNum != pMnode->mnodeEqualVnodeNum) { + mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum, pMnode->mnodeEqualVnodeNum); + return DND_REASON_MN_EQUAL_VN_NOT_MATCH; + } + + if (pCfg->statusInterval != pMnode->statusInterval) { + mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->statusInterval); + return DND_REASON_STATUS_INTERVAL_NOT_MATCH; + } + + int64_t checkTime = 0; + char timestr[32] = "1970-01-01 00:00:00.00"; + (void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + if ((0 != strcasecmp(pCfg->timezone, pMnode->timezone)) && (checkTime != pCfg->checkTime)) { + mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, tsTimezone, + pCfg->checkTime, checkTime); + return DND_REASON_TIME_ZONE_NOT_MATCH; + } + + if (0 != strcasecmp(pCfg->locale, pMnode->locale)) { + mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->locale); + return DND_REASON_LOCALE_NOT_MATCH; + } + + if (0 != strcasecmp(pCfg->charset, pMnode->charset)) { + mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->charset); + return DND_REASON_CHARSET_NOT_MATCH; + } + + return 0; +} + +static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { + SStatusMsg *pStatus = pMsg->rpcMsg.pCont; + pStatus->sver = htonl(pStatus->sver); + pStatus->dnodeId = htonl(pStatus->dnodeId); + pStatus->clusterId = htobe64(pStatus->clusterId); + pStatus->rebootTime = htonl(pStatus->rebootTime); + pStatus->numOfCores = htons(pStatus->numOfCores); + pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes); + pStatus->numOfSupportVnodes = htons(pStatus->numOfSupportVnodes); + pStatus->numOfSupportQnodes = htons(pStatus->numOfSupportQnodes); + + pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); + pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum); + pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); + + SDnodeObj *pDnode = NULL; + if (pStatus->dnodeId == 0) { + pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + if (pDnode == NULL) { + mDebug("dnode:%s, not created yet", pStatus->dnodeEp); + return TSDB_CODE_MND_DNODE_NOT_EXIST; + } + } else { + pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); + if (pDnode == NULL) { + pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; + } + mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_MND_DNODE_NOT_EXIST; + } + } + + if (pStatus->sver != pMnode->sver) { + if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; + } + mndReleaseDnode(pMnode, pDnode); + mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->sver); + return TSDB_CODE_MND_INVALID_MSG_VERSION; + } + + int64_t clusterId = mndGetClusterId(pMnode); + if (pStatus->dnodeId == 0) { + mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, clusterId); + } else { + if (pStatus->clusterId != clusterId) { + if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + } + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, clusterId); + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_MND_INVALID_CLUSTER_ID; + } else { + pDnode->accessTimes++; + mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); + } + } + + if (pDnode->status == DND_STATUS_OFFLINE) { + // Verify whether the cluster parameters are consistent when status change from offline to ready + int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); + if (0 != ret) { + pDnode->offlineReason = ret; + mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT; + } + + mInfo("dnode:%d, from offline to online", pDnode->id); + } + + pDnode->rebootTime = pStatus->rebootTime; + pDnode->numOfCores = pStatus->numOfCores; + pDnode->numOfSupportMnodes = pStatus->numOfSupportMnodes; + pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; + pDnode->numOfSupportQnodes = pStatus->numOfSupportQnodes; + pDnode->status = DND_STATUS_READY; + + int32_t numOfEps = mndGetDnodeSize(pMnode); + int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); + SStatusRsp *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + mndReleaseDnode(pMnode, pDnode); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); + pRsp->dnodeCfg.dropped = 0; + pRsp->dnodeCfg.clusterId = htobe64(clusterId); + mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + + pMsg->rpcRsp.len = contLen; + pMsg->rpcRsp.rsp = pRsp; + mndReleaseDnode(pMnode, pDnode); + + return 0; +} + static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } @@ -109,8 +313,8 @@ static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } int32_t mndInitDnode(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_USER, - .keyType = SDB_KEY_BINARY, + SSdbTable table = {.sdbType = SDB_DNODE, + .keyType = SDB_KEY_INT32, .deployFp = (SdbDeployFp)mndCreateDefaultDnode, .encodeFp = (SdbEncodeFp)mndDnodeActionEncode, .decodeFp = (SdbDecodeFp)mndDnodeActionDecode, @@ -121,8 +325,19 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS_RSP, mndProcessStatusMsg); return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupDnode(SMnode *pMnode) {} \ No newline at end of file +void mndCleanupDnode(SMnode *pMnode) {} + +SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) { + SSdb *pSdb = pMnode->pSdb; + return sdbAcquire(pSdb, SDB_DNODE, &dnodeId); +} + +void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pDnode); +} diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index c2e05687c7..bbf39dcff9 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -14,11 +14,120 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndTrans.h" -int32_t mndInitMnode(SMnode *pMnode) { return 0; } -void mndCleanupMnode(SMnode *pMnode) {} +#define SDB_MNODE_VER 1 -void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {} -void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {} \ No newline at end of file +static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj) { + SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, SDB_MNODE_VER, sizeof(SMnodeObj)); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, pMnodeObj->id); + SDB_SET_INT64(pRaw, dataPos, pMnodeObj->createdTime) + SDB_SET_INT64(pRaw, dataPos, pMnodeObj->updateTime) + + return pRaw; +} + +static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_MNODE_VER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode mnode since %s", terrstr()); + return NULL; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj)); + SMnodeObj *pMnodeObj = sdbGetRowObj(pRow); + if (pMnodeObj == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pMnodeObj->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pMnodeObj->updateTime) + + return pRow; +} + +static void mnodeResetMnode(SMnodeObj *pMnodeObj) { + pMnodeObj->role = TAOS_SYNC_STATE_FOLLOWER; + pMnodeObj->roleTerm = 0; + pMnodeObj->roleTime = 0; +} + +static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) { + pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id); + if (pMnodeObj->pDnode == NULL) { + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + return -1; + } + + mnodeResetMnode(pMnodeObj); + return 0; +} + +static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { + if (pMnodeObj->pDnode != NULL) { + sdbRelease(pSdb, pMnodeObj->pDnode); + pMnodeObj->pDnode = NULL; + } + + return 0; +} + +static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) { + pSrcMnode->id = pDstMnode->id; + pSrcMnode->createdTime = pDstMnode->createdTime; + pSrcMnode->updateTime = pDstMnode->updateTime; + mnodeResetMnode(pSrcMnode); +} + +static int32_t mndCreateDefaultMnode(SMnode *pMnode) { + SMnodeObj mnodeObj = {0}; + mnodeObj.id = 0; + mnodeObj.createdTime = taosGetTimestampMs(); + mnodeObj.updateTime = mnodeObj.createdTime; + + SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj); + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + + return sdbWrite(pMnode->pSdb, pRaw); +} + +static int32_t mndProcessCreateMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessDropMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +int32_t mndInitMnode(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_MNODE, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultMnode, + .encodeFp = (SdbEncodeFp)mndMnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndMnodeActionDecode, + .insertFp = (SdbInsertFp)mndMnodeActionInsert, + .updateFp = (SdbUpdateFp)mndMnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndMnodeActionDelete}; + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeMsg); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupMnode(SMnode *pMnode) {} + +bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) { + SSdb *pSdb = pMnode->pSdb; + + SMnodeObj *pMnodeObj = sdbAcquire(pSdb, SDB_MNODE, &dnodeId); + if (pMnodeObj == NULL) { + return false; + } + + sdbRelease(pSdb, pMnodeObj); + return true; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index bc2ed92197..e1c7b66c36 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -225,15 +225,22 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; + pMnode->sver = pOption->sver; + pMnode->statusInterval = pOption->statusInterval; + pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum; + pMnode->timezone = strdup(pOption->timezone); + pMnode->locale = strdup(pOption->locale); + pMnode->charset = strdup(pOption->charset); if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || - pMnode->putMsgToApplyMsgFp == NULL) { + pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || + pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } - if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) { - terrno = TSDB_CODE_MND_APP_ERROR; + if (pMnode->timezone == NULL || pMnode->locale == NULL || pMnode->charset == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -299,6 +306,9 @@ void mndClose(SMnode *pMnode) { mDebug("start to close mnode"); mndCleanupSteps(pMnode, -1); tfree(pMnode->path); + tfree(pMnode->charset); + tfree(pMnode->locale); + tfree(pMnode->timezone); tfree(pMnode); mDebug("mnode is closed"); } -- GitLab