From 94756ccd947e62aa26166d899d0ac4620f517f11 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 4 Dec 2020 13:07:23 +0800 Subject: [PATCH] TD-2331 --- src/dnode/inc/dnodeMInfos.h | 6 +- src/dnode/src/dnodeMInfos.c | 58 +++++++------- src/dnode/src/dnodeMgmt.c | 4 +- src/dnode/src/dnodeModule.c | 6 +- src/inc/dnode.h | 2 +- src/inc/taosmsg.h | 14 ++-- src/mnode/inc/mnodeMnode.h | 2 +- src/mnode/inc/mnodeSdb.h | 1 + src/mnode/src/mnodeMnode.c | 148 +++++++++++++++++++++--------------- src/mnode/src/mnodeSdb.c | 25 ++++-- src/sync/src/syncMain.c | 2 +- 11 files changed, 152 insertions(+), 116 deletions(-) diff --git a/src/dnode/inc/dnodeMInfos.h b/src/dnode/inc/dnodeMInfos.h index 9c3c85c47e..2c3eef5d5d 100644 --- a/src/dnode/inc/dnodeMInfos.h +++ b/src/dnode/inc/dnodeMInfos.h @@ -24,9 +24,9 @@ extern "C" { int32_t dnodeInitMInfos(); void dnodeCleanupMInfos(); -void dnodeUpdateMInfos(SMnodeInfos *minfos); -void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet); -void dnodeGetMInfos(SMnodeInfos *minfos); +void dnodeUpdateMInfos(SMInfos *pMinfos); +void dnodeUpdateEpSetForPeer(SRpcEpSet *pEpSet); +void dnodeGetMInfos(SMInfos *pMinfos); bool dnodeIsMasterEp(char *ep); #ifdef __cplusplus diff --git a/src/dnode/src/dnodeMInfos.c b/src/dnode/src/dnodeMInfos.c index cefe44aebe..162de2243e 100644 --- a/src/dnode/src/dnodeMInfos.c +++ b/src/dnode/src/dnodeMInfos.c @@ -22,12 +22,12 @@ #include "dnodeInt.h" #include "dnodeMInfos.h" -static SMnodeInfos tsMInfos; -static SRpcEpSet tsMEpSet; +static SMInfos tsMInfos; +static SRpcEpSet tsMEpSet; static pthread_mutex_t tsMInfosMutex; -static void dnodeResetMInfos(SMnodeInfos *minfos); -static void dnodePrintMInfos(SMnodeInfos *minfos); +static void dnodeResetMInfos(SMInfos *minfos); +static void dnodePrintMInfos(SMInfos *minfos); static int32_t dnodeReadMInfos(); static int32_t dnodeWriteMInfos(); @@ -44,14 +44,14 @@ int32_t dnodeInitMInfos() { void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); } -void dnodeUpdateMInfos(SMnodeInfos *minfos) { - if (minfos->mnodeNum <= 0 || minfos->mnodeNum > 3) { - dError("invalid mnode infos, mnodeNum:%d", minfos->mnodeNum); +void dnodeUpdateMInfos(SMInfos *pMinfos) { + if (pMinfos->mnodeNum <= 0 || pMinfos->mnodeNum > 3) { + dError("invalid mnode infos, mnodeNum:%d", pMinfos->mnodeNum); return; } - for (int32_t i = 0; i < minfos->mnodeNum; ++i) { - SMnodeInfo *minfo = &minfos->mnodeInfos[i]; + for (int32_t i = 0; i < pMinfos->mnodeNum; ++i) { + SMInfo *minfo = &pMinfos->mnodeInfos[i]; minfo->mnodeId = htonl(minfo->mnodeId); if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) { dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp); @@ -60,14 +60,14 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) { } pthread_mutex_lock(&tsMInfosMutex); - if (minfos->mnodeNum != tsMInfos.mnodeNum) { - dnodeResetMInfos(minfos); + if (pMinfos->mnodeNum != tsMInfos.mnodeNum) { + dnodeResetMInfos(pMinfos); dnodeWriteMInfos(); sdbUpdateAsync(); } else { - int32_t size = sizeof(SMnodeInfos); - if (memcmp(minfos, &tsMInfos, size) != 0) { - dnodeResetMInfos(minfos); + int32_t size = sizeof(SMInfos); + if (memcmp(pMinfos, &tsMInfos, size) != 0) { + dnodeResetMInfos(pMinfos); dnodeWriteMInfos(); sdbUpdateAsync(); } @@ -99,11 +99,11 @@ bool dnodeIsMasterEp(char *ep) { return isMaster; } -void dnodeGetMInfos(SMnodeInfos *minfos) { +void dnodeGetMInfos(SMInfos *pMinfos) { pthread_mutex_lock(&tsMInfosMutex); - memcpy(minfos, &tsMInfos, sizeof(SMnodeInfos)); + memcpy(pMinfos, &tsMInfos, sizeof(SMInfos)); for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) { - minfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId); + pMinfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId); } pthread_mutex_unlock(&tsMInfosMutex); } @@ -123,15 +123,15 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet) { pthread_mutex_unlock(&tsMInfosMutex); } -static void dnodePrintMInfos(SMnodeInfos *minfos) { - dInfo("print mnode infos, mnodeNum:%d inUse:%d", minfos->mnodeNum, minfos->inUse); - for (int32_t i = 0; i < minfos->mnodeNum; i++) { - dInfo("mnode index:%d, %s", minfos->mnodeInfos[i].mnodeId, minfos->mnodeInfos[i].mnodeEp); +static void dnodePrintMInfos(SMInfos *pMinfos) { + dInfo("print minfos, mnodeNum:%d inUse:%d", pMinfos->mnodeNum, pMinfos->inUse); + for (int32_t i = 0; i < pMinfos->mnodeNum; i++) { + dInfo("mnode index:%d, %s", pMinfos->mnodeInfos[i].mnodeId, pMinfos->mnodeInfos[i].mnodeEp); } } -static void dnodeResetMInfos(SMnodeInfos *minfos) { - if (minfos == NULL) { +static void dnodeResetMInfos(SMInfos *pMinfos) { + if (pMinfos == NULL) { tsMEpSet.numOfEps = 1; taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]); @@ -142,10 +142,10 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) { return; } - if (minfos->mnodeNum == 0) return; + if (pMinfos->mnodeNum == 0) return; - int32_t size = sizeof(SMnodeInfos); - memcpy(&tsMInfos, minfos, size); + int32_t size = sizeof(SMInfos); + memcpy(&tsMInfos, pMinfos, size); tsMEpSet.inUse = tsMInfos.inUse; tsMEpSet.numOfEps = tsMInfos.mnodeNum; @@ -153,7 +153,7 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) { taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]); } - dnodePrintMInfos(minfos); + dnodePrintMInfos(pMinfos); } static int32_t dnodeReadMInfos() { @@ -162,7 +162,7 @@ static int32_t dnodeReadMInfos() { char * content = calloc(1, maxLen + 1); cJSON * root = NULL; FILE * fp = NULL; - SMnodeInfos minfos = {0}; + SMInfos minfos = {0}; char file[TSDB_FILENAME_LEN + 20] = {0}; sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir); @@ -241,7 +241,7 @@ PARSE_MINFOS_OVER: terrno = 0; for (int32_t i = 0; i < minfos.mnodeNum; ++i) { - SMnodeInfo *mInfo = &minfos.mnodeInfos[i]; + SMInfo *mInfo = &minfos.mnodeInfos[i]; dnodeUpdateEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); } dnodeResetMInfos(&minfos); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 5c01f64716..15378c77c1 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -472,8 +472,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { } SStatusRsp *pStatusRsp = pMsg->pCont; - SMnodeInfos *minfos = &pStatusRsp->mnodes; - dnodeUpdateMInfos(minfos); + SMInfos *pMinfos = &pStatusRsp->mnodes; + dnodeUpdateMInfos(pMinfos); SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 7faa3c8913..f664618f51 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -147,8 +147,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { } } -bool dnodeStartMnode(SMnodeInfos *minfos) { - SMnodeInfos *mnodes = minfos; +bool dnodeStartMnode(SMInfos *pMinfos) { + SMInfos *pMnodes = pMinfos; if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) { dDebug("mnode module is already started, module status:%d", tsModuleStatus); @@ -159,7 +159,7 @@ bool dnodeStartMnode(SMnodeInfos *minfos) { dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); dnodeProcessModuleStatus(moduleStatus); - sdbUpdateSync(mnodes); + sdbUpdateSync(pMnodes); return true; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index eef4490800..1efaa4a24b 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -45,7 +45,7 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet); int32_t dnodeGetDnodeId(); void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr); -bool dnodeStartMnode(SMnodeInfos *minfos); +bool dnodeStartMnode(SMInfos *pMinfos); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b4d3bec958..e8e3029244 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -591,13 +591,13 @@ typedef struct { typedef struct { int32_t mnodeId; char mnodeEp[TSDB_EP_LEN]; -} SMnodeInfo; +} SMInfo; typedef struct { - int8_t inUse; - int8_t mnodeNum; - SMnodeInfo mnodeInfos[TSDB_MAX_REPLICA]; -} SMnodeInfos; + int8_t inUse; + int8_t mnodeNum; + SMInfo mnodeInfos[TSDB_MAX_REPLICA]; +} SMInfos; typedef struct { int32_t numOfMnodes; // tsNumOfMnodes @@ -632,7 +632,7 @@ typedef struct { } SStatusMsg; typedef struct { - SMnodeInfos mnodes; + SMInfos mnodes; SDnodeCfg dnodeCfg; SVgroupAccess vgAccess[]; } SStatusRsp; @@ -761,7 +761,7 @@ typedef struct { typedef struct { int32_t dnodeId; char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port - SMnodeInfos mnodes; + SMInfos mnodes; } SCreateMnodeMsg; typedef struct { diff --git a/src/mnode/inc/mnodeMnode.h b/src/mnode/inc/mnodeMnode.h index 10cbcebe22..93f2fa11ea 100644 --- a/src/mnode/inc/mnodeMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -48,7 +48,7 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); char* mnodeGetMnodeMasterEp(); void mnodeGetMnodeInfos(void *mnodes); -void mnodeUpdateMnodeEpSet(); +void mnodeUpdateMnodeEpSet(SMInfos *pMnodes); #ifdef __cplusplus } diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 90c4eac40a..31ea2da640 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -89,6 +89,7 @@ void* sdbGetTableByRid(int64_t rid); bool sdbIsMaster(); bool sdbIsServing(); void sdbUpdateMnodeRoles(); +int32_t sdbGetReplicaNum(); int32_t sdbInsertRow(SSdbRow *pRow); int32_t sdbDeleteRow(SSdbRow *pRow); diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index d20d51f82b..ba0b5a1865 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -34,14 +34,14 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -int64_t tsMnodeRid = -1; -static void * tsMnodeSdb = NULL; -static int32_t tsMnodeUpdateSize = 0; -static SRpcEpSet tsMnodeEpSetForShell; -static SRpcEpSet tsMnodeEpSetForPeer; -static SMnodeInfos tsMnodeInfos; -static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); +int64_t tsMnodeRid = -1; +static void * tsMnodeSdb = NULL; +static int32_t tsMnodeUpdateSize = 0; +static SRpcEpSet tsMEpForShell; +static SRpcEpSet tsMEpForPeer; +static SMInfos tsMInfos; +static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #if defined(LINUX) static pthread_rwlock_t tsMnodeLock; @@ -127,7 +127,7 @@ static int32_t mnodeMnodeActionRestored() { mnodeCancelGetNextMnode(pIter); } - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); return TSDB_CODE_SUCCESS; } @@ -199,93 +199,119 @@ void mnodeCancelGetNextMnode(void *pIter) { sdbFreeIter(tsMnodeSdb, pIter); } -void mnodeUpdateMnodeEpSet() { - mInfo("update mnodes epSet, numOfEps:%d ", mnodeGetMnodesNum()); +void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) { + bool set = false; + SMInfos mInfos = {0}; + mInfo("vgId:1, update mnodes epSet, numOfEps:%d pMinfos:%p", mnodeGetMnodesNum(), pMinfos); - mnodeMnodeWrLock(); - - memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet)); - memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet)); - memset(&tsMnodeInfos, 0, sizeof(SMnodeInfos)); - - int32_t index = 0; - void * pIter = NULL; - while (1) { - SMnodeObj *pMnode = NULL; - pIter = mnodeGetNextMnode(pIter, &pMnode); - if (pMnode == NULL) break; - - SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); - if (pDnode != NULL) { - strcpy(tsMnodeEpSetForShell.fqdn[index], pDnode->dnodeFqdn); - tsMnodeEpSetForShell.port[index] = htons(pDnode->dnodePort); - mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForShell.fqdn[index], htons(tsMnodeEpSetForShell.port[index])); - - strcpy(tsMnodeEpSetForPeer.fqdn[index], pDnode->dnodeFqdn); - tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); - mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index])); - - tsMnodeInfos.mnodeInfos[index].mnodeId = htonl(pMnode->mnodeId); - strcpy(tsMnodeInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp); - - if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { - tsMnodeEpSetForShell.inUse = index; - tsMnodeEpSetForPeer.inUse = index; - tsMnodeInfos.inUse = index; + if (pMinfos != NULL) { + set = true; + mInfos = *pMinfos; + } + else { + int32_t index = 0; + void * pIter = NULL; + while (1) { + SMnodeObj *pMnode = NULL; + pIter = mnodeGetNextMnode(pIter, &pMnode); + if (pMnode == NULL) break; + + SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); + if (pDnode != NULL) { + set = true; + mInfos.mnodeInfos[index].mnodeId = pMnode->mnodeId; + strcpy(mInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp); + if (pMnode->role == TAOS_SYNC_ROLE_MASTER) mInfos.inUse = index; + index++; + } else { + set = false; } - mInfo("mnode:%d, ep:%s %s", pDnode->dnodeId, pDnode->dnodeEp, pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : ""); - index++; + mnodeDecDnodeRef(pDnode); + mnodeDecMnodeRef(pMnode); } - mnodeDecDnodeRef(pDnode); - mnodeDecMnodeRef(pMnode); + mInfos.mnodeNum = index; + if (sdbGetReplicaNum() != mInfos.mnodeNum) { + set = false; + mDebug("vgId:1, mnodes info not synced, cfg:%d current:%d", sdbGetReplicaNum(), mInfos.mnodeNum); + } } - tsMnodeInfos.mnodeNum = index; - tsMnodeEpSetForShell.numOfEps = index; - tsMnodeEpSetForPeer.numOfEps = index; + mnodeMnodeWrLock(); + + if (set) { + memset(&tsMEpForShell, 0, sizeof(SRpcEpSet)); + memset(&tsMEpForPeer, 0, sizeof(SRpcEpSet)); + memcpy(&tsMInfos, &mInfos, sizeof(SMInfos)); + tsMEpForShell.inUse = tsMInfos.inUse; + tsMEpForPeer.inUse = tsMInfos.inUse; + tsMEpForShell.numOfEps = tsMInfos.mnodeNum; + tsMEpForPeer.numOfEps = tsMInfos.mnodeNum; + + mInfo("vgId:1, mnodes epSet is set, num:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse); + for (int index = 0; index < mInfos.mnodeNum; ++index) { + SMInfo *pInfo = &tsMInfos.mnodeInfos[index]; + taosGetFqdnPortFromEp(pInfo->mnodeEp, tsMEpForShell.fqdn[index], &tsMEpForShell.port[index]); + taosGetFqdnPortFromEp(pInfo->mnodeEp, tsMEpForPeer.fqdn[index], &tsMEpForPeer.port[index]); + tsMEpForPeer.port[index] = tsMEpForPeer.port[index] + TSDB_PORT_DNODEDNODE; + + mInfo("vgId:1, mnode:%d, fqdn:%s shell:%u peer:%u", pInfo->mnodeId, tsMEpForShell.fqdn[index], + tsMEpForShell.port[index], tsMEpForPeer.port[index]); + + tsMEpForShell.port[index] = htons(tsMEpForShell.port[index]); + tsMEpForPeer.port[index] = htons(tsMEpForPeer.port[index]); + pInfo->mnodeId = htonl(pInfo->mnodeId); + } + } else { + mInfo("vgId:1, mnodes epSet not set, num:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse); + for (int index = 0; index < tsMInfos.mnodeNum; ++index) { + mInfo("vgId:1, index:%d, ep:%s:%u", index, tsMEpForShell.fqdn[index], htons(tsMEpForShell.port[index])); + } + } mnodeMnodeUnLock(); } void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *epSet = tsMnodeEpSetForPeer; + *epSet = tsMEpForPeer; mnodeMnodeUnLock(); + mTrace("vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d", tsMEpForPeer.numOfEps, tsMEpForPeer.inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { epSet->inUse = (i + 1) % epSet->numOfEps; - mTrace("mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + mTrace("vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { - mTrace("mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + mTrace("vgId:1, mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); } } } void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *epSet = tsMnodeEpSetForShell; + *epSet = tsMEpForShell; mnodeMnodeUnLock(); + mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { epSet->inUse = (i + 1) % epSet->numOfEps; - mTrace("mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + mTrace("vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { - mTrace("mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + mTrace("vgId:1, mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); } } } char* mnodeGetMnodeMasterEp() { - return tsMnodeInfos.mnodeInfos[tsMnodeInfos.inUse].mnodeEp; + return tsMInfos.mnodeInfos[tsMInfos.inUse].mnodeEp; } -void mnodeGetMnodeInfos(void *mnodeInfos) { +void mnodeGetMnodeInfos(void *pMinfos) { mnodeMnodeRdLock(); - *(SMnodeInfos *)mnodeInfos = tsMnodeInfos; + *(SMInfos *)pMinfos = tsMInfos; mnodeMnodeUnLock(); } @@ -298,7 +324,7 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) { } else { pCreate->dnodeId = htonl(dnodeId); tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); - pCreate->mnodes = tsMnodeInfos; + pCreate->mnodes = tsMInfos; bool found = false; for (int i = 0; i < pCreate->mnodes.mnodeNum; ++i) { if (pCreate->mnodes.mnodeInfos[i].mnodeId == htonl(dnodeId)) { @@ -336,7 +362,7 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) { mError("failed to create mnode, reason:%s", tstrerror(code)); } else { mDebug("mnode is created successfully"); - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); sdbUpdateAsync(); } @@ -380,7 +406,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { mnodeDecMnodeRef(pMnode); } - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); sdbUpdateAsync(); } @@ -400,7 +426,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { sdbDecRef(tsMnodeSdb, pMnode); - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); sdbUpdateAsync(); return code; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 2ef758baf1..a8cd595fc8 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -224,11 +224,13 @@ void sdbUpdateMnodeRoles() { sdbInfo("vgId:1, mnode:%d, role:%s", pMnode->mnodeId, syncRole[pMnode->role]); if (pMnode->mnodeId == dnodeGetDnodeId()) tsSdbMgmt.role = pMnode->role; mnodeDecMnodeRef(pMnode); + } else { + sdbDebug("vgId:1, mnode:%d not found", roles.nodeId[i]); } } mnodeUpdateClusterId(); - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); } static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { @@ -308,18 +310,20 @@ void sdbUpdateAsync() { } void sdbUpdateSync(void *pMnodes) { - SMnodeInfos *mnodes = pMnodes; + SMInfos *pMinfos = pMnodes; if (!mnodeIsRunning()) { mDebug("vgId:1, mnode not start yet, update sync config later"); return; } - mDebug("vgId:1, update sync config in sync module, mnodes:%p", pMnodes); + mDebug("vgId:1, update sync config, pMnodes:%p", pMnodes); SSyncCfg syncCfg = {0}; int32_t index = 0; - if (mnodes == NULL) { + if (pMinfos == NULL) { + mDebug("vgId:1, mInfos not input, use mInfos in sdb, numOfMnodes:%d", syncCfg.replica); + void *pIter = NULL; while (1) { SMnodeObj *pMnode = NULL; @@ -339,16 +343,17 @@ void sdbUpdateSync(void *pMnodes) { mnodeDecMnodeRef(pMnode); } syncCfg.replica = index; - mDebug("vgId:1, mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica); } else { - for (index = 0; index < mnodes->mnodeNum; ++index) { - SMnodeInfo *node = &mnodes->mnodeInfos[index]; + mDebug("vgId:1, mInfos input, numOfMnodes:%d", syncCfg.replica); + + for (index = 0; index < pMinfos->mnodeNum; ++index) { + SMInfo *node = &pMinfos->mnodeInfos[index]; syncCfg.nodeInfo[index].nodeId = node->mnodeId; taosGetFqdnPortFromEp(node->mnodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort); syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC; } syncCfg.replica = index; - mDebug("vgId:1, mnodes info input, numOfMnodes:%d", syncCfg.replica); + mnodeUpdateMnodeEpSet(pMnodes); } syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; @@ -1103,3 +1108,7 @@ static void *sdbWorkerFp(void *pWorker) { return NULL; } + +int32_t sdbGetReplicaNum() { + return tsSdbMgmt.cfg.replica; +} \ No newline at end of file diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index adac532f2d..7e9e8cc2d0 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -548,7 +548,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { pPeer->pSyncNode = pNode; pPeer->refCount = 1; - sInfo("%s, it is configured", pPeer->id); + sInfo("%s, it is configured, ep:%s:%u", pPeer->id, pPeer->fqdn, pPeer->port); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { int32_t checkMs = 100 + (pNode->vgId * 10) % 100; -- GitLab