提交 94756ccd 编写于 作者: S Shengliang Guan

TD-2331

上级 573fcb4e
...@@ -24,9 +24,9 @@ extern "C" { ...@@ -24,9 +24,9 @@ extern "C" {
int32_t dnodeInitMInfos(); int32_t dnodeInitMInfos();
void dnodeCleanupMInfos(); void dnodeCleanupMInfos();
void dnodeUpdateMInfos(SMnodeInfos *minfos); void dnodeUpdateMInfos(SMInfos *pMinfos);
void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet); void dnodeUpdateEpSetForPeer(SRpcEpSet *pEpSet);
void dnodeGetMInfos(SMnodeInfos *minfos); void dnodeGetMInfos(SMInfos *pMinfos);
bool dnodeIsMasterEp(char *ep); bool dnodeIsMasterEp(char *ep);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,12 +22,12 @@ ...@@ -22,12 +22,12 @@
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeMInfos.h" #include "dnodeMInfos.h"
static SMnodeInfos tsMInfos; static SMInfos tsMInfos;
static SRpcEpSet tsMEpSet; static SRpcEpSet tsMEpSet;
static pthread_mutex_t tsMInfosMutex; static pthread_mutex_t tsMInfosMutex;
static void dnodeResetMInfos(SMnodeInfos *minfos); static void dnodeResetMInfos(SMInfos *minfos);
static void dnodePrintMInfos(SMnodeInfos *minfos); static void dnodePrintMInfos(SMInfos *minfos);
static int32_t dnodeReadMInfos(); static int32_t dnodeReadMInfos();
static int32_t dnodeWriteMInfos(); static int32_t dnodeWriteMInfos();
...@@ -44,14 +44,14 @@ int32_t dnodeInitMInfos() { ...@@ -44,14 +44,14 @@ int32_t dnodeInitMInfos() {
void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); } void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); }
void dnodeUpdateMInfos(SMnodeInfos *minfos) { void dnodeUpdateMInfos(SMInfos *pMinfos) {
if (minfos->mnodeNum <= 0 || minfos->mnodeNum > 3) { if (pMinfos->mnodeNum <= 0 || pMinfos->mnodeNum > 3) {
dError("invalid mnode infos, mnodeNum:%d", minfos->mnodeNum); dError("invalid mnode infos, mnodeNum:%d", pMinfos->mnodeNum);
return; return;
} }
for (int32_t i = 0; i < minfos->mnodeNum; ++i) { for (int32_t i = 0; i < pMinfos->mnodeNum; ++i) {
SMnodeInfo *minfo = &minfos->mnodeInfos[i]; SMInfo *minfo = &pMinfos->mnodeInfos[i];
minfo->mnodeId = htonl(minfo->mnodeId); minfo->mnodeId = htonl(minfo->mnodeId);
if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) { if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) {
dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp); dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp);
...@@ -60,14 +60,14 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) { ...@@ -60,14 +60,14 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) {
} }
pthread_mutex_lock(&tsMInfosMutex); pthread_mutex_lock(&tsMInfosMutex);
if (minfos->mnodeNum != tsMInfos.mnodeNum) { if (pMinfos->mnodeNum != tsMInfos.mnodeNum) {
dnodeResetMInfos(minfos); dnodeResetMInfos(pMinfos);
dnodeWriteMInfos(); dnodeWriteMInfos();
sdbUpdateAsync(); sdbUpdateAsync();
} else { } else {
int32_t size = sizeof(SMnodeInfos); int32_t size = sizeof(SMInfos);
if (memcmp(minfos, &tsMInfos, size) != 0) { if (memcmp(pMinfos, &tsMInfos, size) != 0) {
dnodeResetMInfos(minfos); dnodeResetMInfos(pMinfos);
dnodeWriteMInfos(); dnodeWriteMInfos();
sdbUpdateAsync(); sdbUpdateAsync();
} }
...@@ -99,11 +99,11 @@ bool dnodeIsMasterEp(char *ep) { ...@@ -99,11 +99,11 @@ bool dnodeIsMasterEp(char *ep) {
return isMaster; return isMaster;
} }
void dnodeGetMInfos(SMnodeInfos *minfos) { void dnodeGetMInfos(SMInfos *pMinfos) {
pthread_mutex_lock(&tsMInfosMutex); pthread_mutex_lock(&tsMInfosMutex);
memcpy(minfos, &tsMInfos, sizeof(SMnodeInfos)); memcpy(pMinfos, &tsMInfos, sizeof(SMInfos));
for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) { for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) {
minfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId); pMinfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId);
} }
pthread_mutex_unlock(&tsMInfosMutex); pthread_mutex_unlock(&tsMInfosMutex);
} }
...@@ -123,15 +123,15 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet) { ...@@ -123,15 +123,15 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_unlock(&tsMInfosMutex); pthread_mutex_unlock(&tsMInfosMutex);
} }
static void dnodePrintMInfos(SMnodeInfos *minfos) { static void dnodePrintMInfos(SMInfos *pMinfos) {
dInfo("print mnode infos, mnodeNum:%d inUse:%d", minfos->mnodeNum, minfos->inUse); dInfo("print minfos, mnodeNum:%d inUse:%d", pMinfos->mnodeNum, pMinfos->inUse);
for (int32_t i = 0; i < minfos->mnodeNum; i++) { for (int32_t i = 0; i < pMinfos->mnodeNum; i++) {
dInfo("mnode index:%d, %s", minfos->mnodeInfos[i].mnodeId, minfos->mnodeInfos[i].mnodeEp); dInfo("mnode index:%d, %s", pMinfos->mnodeInfos[i].mnodeId, pMinfos->mnodeInfos[i].mnodeEp);
} }
} }
static void dnodeResetMInfos(SMnodeInfos *minfos) { static void dnodeResetMInfos(SMInfos *pMinfos) {
if (minfos == NULL) { if (pMinfos == NULL) {
tsMEpSet.numOfEps = 1; tsMEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]); taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]);
...@@ -142,10 +142,10 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) { ...@@ -142,10 +142,10 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) {
return; return;
} }
if (minfos->mnodeNum == 0) return; if (pMinfos->mnodeNum == 0) return;
int32_t size = sizeof(SMnodeInfos); int32_t size = sizeof(SMInfos);
memcpy(&tsMInfos, minfos, size); memcpy(&tsMInfos, pMinfos, size);
tsMEpSet.inUse = tsMInfos.inUse; tsMEpSet.inUse = tsMInfos.inUse;
tsMEpSet.numOfEps = tsMInfos.mnodeNum; tsMEpSet.numOfEps = tsMInfos.mnodeNum;
...@@ -153,7 +153,7 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) { ...@@ -153,7 +153,7 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) {
taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]); taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]);
} }
dnodePrintMInfos(minfos); dnodePrintMInfos(pMinfos);
} }
static int32_t dnodeReadMInfos() { static int32_t dnodeReadMInfos() {
...@@ -162,7 +162,7 @@ static int32_t dnodeReadMInfos() { ...@@ -162,7 +162,7 @@ static int32_t dnodeReadMInfos() {
char * content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
cJSON * root = NULL; cJSON * root = NULL;
FILE * fp = NULL; FILE * fp = NULL;
SMnodeInfos minfos = {0}; SMInfos minfos = {0};
char file[TSDB_FILENAME_LEN + 20] = {0}; char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir); sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir);
...@@ -241,7 +241,7 @@ PARSE_MINFOS_OVER: ...@@ -241,7 +241,7 @@ PARSE_MINFOS_OVER:
terrno = 0; terrno = 0;
for (int32_t i = 0; i < minfos.mnodeNum; ++i) { for (int32_t i = 0; i < minfos.mnodeNum; ++i) {
SMnodeInfo *mInfo = &minfos.mnodeInfos[i]; SMInfo *mInfo = &minfos.mnodeInfos[i];
dnodeUpdateEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); dnodeUpdateEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL);
} }
dnodeResetMInfos(&minfos); dnodeResetMInfos(&minfos);
......
...@@ -472,8 +472,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -472,8 +472,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
} }
SStatusRsp *pStatusRsp = pMsg->pCont; SStatusRsp *pStatusRsp = pMsg->pCont;
SMnodeInfos *minfos = &pStatusRsp->mnodes; SMInfos *pMinfos = &pStatusRsp->mnodes;
dnodeUpdateMInfos(minfos); dnodeUpdateMInfos(pMinfos);
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
......
...@@ -147,8 +147,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { ...@@ -147,8 +147,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
} }
} }
bool dnodeStartMnode(SMnodeInfos *minfos) { bool dnodeStartMnode(SMInfos *pMinfos) {
SMnodeInfos *mnodes = minfos; SMInfos *pMnodes = pMinfos;
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) { if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus); dDebug("mnode module is already started, module status:%d", tsModuleStatus);
...@@ -159,7 +159,7 @@ bool dnodeStartMnode(SMnodeInfos *minfos) { ...@@ -159,7 +159,7 @@ bool dnodeStartMnode(SMnodeInfos *minfos) {
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
dnodeProcessModuleStatus(moduleStatus); dnodeProcessModuleStatus(moduleStatus);
sdbUpdateSync(mnodes); sdbUpdateSync(pMnodes);
return true; return true;
} }
...@@ -45,7 +45,7 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet); ...@@ -45,7 +45,7 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet);
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr); bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr);
bool dnodeStartMnode(SMnodeInfos *minfos); bool dnodeStartMnode(SMInfos *pMinfos);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
......
...@@ -591,13 +591,13 @@ typedef struct { ...@@ -591,13 +591,13 @@ typedef struct {
typedef struct { typedef struct {
int32_t mnodeId; int32_t mnodeId;
char mnodeEp[TSDB_EP_LEN]; char mnodeEp[TSDB_EP_LEN];
} SMnodeInfo; } SMInfo;
typedef struct { typedef struct {
int8_t inUse; int8_t inUse;
int8_t mnodeNum; int8_t mnodeNum;
SMnodeInfo mnodeInfos[TSDB_MAX_REPLICA]; SMInfo mnodeInfos[TSDB_MAX_REPLICA];
} SMnodeInfos; } SMInfos;
typedef struct { typedef struct {
int32_t numOfMnodes; // tsNumOfMnodes int32_t numOfMnodes; // tsNumOfMnodes
...@@ -632,7 +632,7 @@ typedef struct { ...@@ -632,7 +632,7 @@ typedef struct {
} SStatusMsg; } SStatusMsg;
typedef struct { typedef struct {
SMnodeInfos mnodes; SMInfos mnodes;
SDnodeCfg dnodeCfg; SDnodeCfg dnodeCfg;
SVgroupAccess vgAccess[]; SVgroupAccess vgAccess[];
} SStatusRsp; } SStatusRsp;
...@@ -761,7 +761,7 @@ typedef struct { ...@@ -761,7 +761,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
SMnodeInfos mnodes; SMInfos mnodes;
} SCreateMnodeMsg; } SCreateMnodeMsg;
typedef struct { typedef struct {
......
...@@ -48,7 +48,7 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); ...@@ -48,7 +48,7 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet);
char* mnodeGetMnodeMasterEp(); char* mnodeGetMnodeMasterEp();
void mnodeGetMnodeInfos(void *mnodes); void mnodeGetMnodeInfos(void *mnodes);
void mnodeUpdateMnodeEpSet(); void mnodeUpdateMnodeEpSet(SMInfos *pMnodes);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -89,6 +89,7 @@ void* sdbGetTableByRid(int64_t rid); ...@@ -89,6 +89,7 @@ void* sdbGetTableByRid(int64_t rid);
bool sdbIsMaster(); bool sdbIsMaster();
bool sdbIsServing(); bool sdbIsServing();
void sdbUpdateMnodeRoles(); void sdbUpdateMnodeRoles();
int32_t sdbGetReplicaNum();
int32_t sdbInsertRow(SSdbRow *pRow); int32_t sdbInsertRow(SSdbRow *pRow);
int32_t sdbDeleteRow(SSdbRow *pRow); int32_t sdbDeleteRow(SSdbRow *pRow);
......
...@@ -34,14 +34,14 @@ ...@@ -34,14 +34,14 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
int64_t tsMnodeRid = -1; int64_t tsMnodeRid = -1;
static void * tsMnodeSdb = NULL; static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0; static int32_t tsMnodeUpdateSize = 0;
static SRpcEpSet tsMnodeEpSetForShell; static SRpcEpSet tsMEpForShell;
static SRpcEpSet tsMnodeEpSetForPeer; static SRpcEpSet tsMEpForPeer;
static SMnodeInfos tsMnodeInfos; static SMInfos tsMInfos;
static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#if defined(LINUX) #if defined(LINUX)
static pthread_rwlock_t tsMnodeLock; static pthread_rwlock_t tsMnodeLock;
...@@ -127,7 +127,7 @@ static int32_t mnodeMnodeActionRestored() { ...@@ -127,7 +127,7 @@ static int32_t mnodeMnodeActionRestored() {
mnodeCancelGetNextMnode(pIter); mnodeCancelGetNextMnode(pIter);
} }
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet(NULL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -199,93 +199,119 @@ void mnodeCancelGetNextMnode(void *pIter) { ...@@ -199,93 +199,119 @@ void mnodeCancelGetNextMnode(void *pIter) {
sdbFreeIter(tsMnodeSdb, pIter); sdbFreeIter(tsMnodeSdb, pIter);
} }
void mnodeUpdateMnodeEpSet() { void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) {
mInfo("update mnodes epSet, numOfEps:%d ", mnodeGetMnodesNum()); bool set = false;
SMInfos mInfos = {0};
mInfo("vgId:1, update mnodes epSet, numOfEps:%d pMinfos:%p", mnodeGetMnodesNum(), pMinfos);
mnodeMnodeWrLock(); if (pMinfos != NULL) {
set = true;
memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet)); mInfos = *pMinfos;
memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet)); }
memset(&tsMnodeInfos, 0, sizeof(SMnodeInfos)); else {
int32_t index = 0;
int32_t index = 0; void * pIter = NULL;
void * pIter = NULL; while (1) {
while (1) { SMnodeObj *pMnode = NULL;
SMnodeObj *pMnode = NULL; pIter = mnodeGetNextMnode(pIter, &pMnode);
pIter = mnodeGetNextMnode(pIter, &pMnode); if (pMnode == NULL) break;
if (pMnode == NULL) break;
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); if (pDnode != NULL) {
if (pDnode != NULL) { set = true;
strcpy(tsMnodeEpSetForShell.fqdn[index], pDnode->dnodeFqdn); mInfos.mnodeInfos[index].mnodeId = pMnode->mnodeId;
tsMnodeEpSetForShell.port[index] = htons(pDnode->dnodePort); strcpy(mInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp);
mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForShell.fqdn[index], htons(tsMnodeEpSetForShell.port[index])); if (pMnode->role == TAOS_SYNC_ROLE_MASTER) mInfos.inUse = index;
index++;
strcpy(tsMnodeEpSetForPeer.fqdn[index], pDnode->dnodeFqdn); } else {
tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); set = false;
mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index]));
tsMnodeInfos.mnodeInfos[index].mnodeId = htonl(pMnode->mnodeId);
strcpy(tsMnodeInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp);
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
tsMnodeEpSetForShell.inUse = index;
tsMnodeEpSetForPeer.inUse = index;
tsMnodeInfos.inUse = index;
} }
mInfo("mnode:%d, ep:%s %s", pDnode->dnodeId, pDnode->dnodeEp, pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : ""); mnodeDecDnodeRef(pDnode);
index++; mnodeDecMnodeRef(pMnode);
} }
mnodeDecDnodeRef(pDnode); mInfos.mnodeNum = index;
mnodeDecMnodeRef(pMnode); if (sdbGetReplicaNum() != mInfos.mnodeNum) {
set = false;
mDebug("vgId:1, mnodes info not synced, cfg:%d current:%d", sdbGetReplicaNum(), mInfos.mnodeNum);
}
} }
tsMnodeInfos.mnodeNum = index; mnodeMnodeWrLock();
tsMnodeEpSetForShell.numOfEps = index;
tsMnodeEpSetForPeer.numOfEps = index; if (set) {
memset(&tsMEpForShell, 0, sizeof(SRpcEpSet));
memset(&tsMEpForPeer, 0, sizeof(SRpcEpSet));
memcpy(&tsMInfos, &mInfos, sizeof(SMInfos));
tsMEpForShell.inUse = tsMInfos.inUse;
tsMEpForPeer.inUse = tsMInfos.inUse;
tsMEpForShell.numOfEps = tsMInfos.mnodeNum;
tsMEpForPeer.numOfEps = tsMInfos.mnodeNum;
mInfo("vgId:1, mnodes epSet is set, num:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse);
for (int index = 0; index < mInfos.mnodeNum; ++index) {
SMInfo *pInfo = &tsMInfos.mnodeInfos[index];
taosGetFqdnPortFromEp(pInfo->mnodeEp, tsMEpForShell.fqdn[index], &tsMEpForShell.port[index]);
taosGetFqdnPortFromEp(pInfo->mnodeEp, tsMEpForPeer.fqdn[index], &tsMEpForPeer.port[index]);
tsMEpForPeer.port[index] = tsMEpForPeer.port[index] + TSDB_PORT_DNODEDNODE;
mInfo("vgId:1, mnode:%d, fqdn:%s shell:%u peer:%u", pInfo->mnodeId, tsMEpForShell.fqdn[index],
tsMEpForShell.port[index], tsMEpForPeer.port[index]);
tsMEpForShell.port[index] = htons(tsMEpForShell.port[index]);
tsMEpForPeer.port[index] = htons(tsMEpForPeer.port[index]);
pInfo->mnodeId = htonl(pInfo->mnodeId);
}
} else {
mInfo("vgId:1, mnodes epSet not set, num:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse);
for (int index = 0; index < tsMInfos.mnodeNum; ++index) {
mInfo("vgId:1, index:%d, ep:%s:%u", index, tsMEpForShell.fqdn[index], htons(tsMEpForShell.port[index]));
}
}
mnodeMnodeUnLock(); mnodeMnodeUnLock();
} }
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForPeer; *epSet = tsMEpForPeer;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
mTrace("vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d", tsMEpForPeer.numOfEps, tsMEpForPeer.inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps; epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); mTrace("vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else { } else {
mTrace("mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); mTrace("vgId:1, mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
} }
} }
} }
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*epSet = tsMnodeEpSetForShell; *epSet = tsMEpForShell;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps; epSet->inUse = (i + 1) % epSet->numOfEps;
mTrace("mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); mTrace("vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else { } else {
mTrace("mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); mTrace("vgId:1, mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
} }
} }
} }
char* mnodeGetMnodeMasterEp() { char* mnodeGetMnodeMasterEp() {
return tsMnodeInfos.mnodeInfos[tsMnodeInfos.inUse].mnodeEp; return tsMInfos.mnodeInfos[tsMInfos.inUse].mnodeEp;
} }
void mnodeGetMnodeInfos(void *mnodeInfos) { void mnodeGetMnodeInfos(void *pMinfos) {
mnodeMnodeRdLock(); mnodeMnodeRdLock();
*(SMnodeInfos *)mnodeInfos = tsMnodeInfos; *(SMInfos *)pMinfos = tsMInfos;
mnodeMnodeUnLock(); mnodeMnodeUnLock();
} }
...@@ -298,7 +324,7 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) { ...@@ -298,7 +324,7 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
} else { } else {
pCreate->dnodeId = htonl(dnodeId); pCreate->dnodeId = htonl(dnodeId);
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes = tsMnodeInfos; pCreate->mnodes = tsMInfos;
bool found = false; bool found = false;
for (int i = 0; i < pCreate->mnodes.mnodeNum; ++i) { for (int i = 0; i < pCreate->mnodes.mnodeNum; ++i) {
if (pCreate->mnodes.mnodeInfos[i].mnodeId == htonl(dnodeId)) { if (pCreate->mnodes.mnodeInfos[i].mnodeId == htonl(dnodeId)) {
...@@ -336,7 +362,7 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -336,7 +362,7 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
mError("failed to create mnode, reason:%s", tstrerror(code)); mError("failed to create mnode, reason:%s", tstrerror(code));
} else { } else {
mDebug("mnode is created successfully"); mDebug("mnode is created successfully");
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet(NULL);
sdbUpdateAsync(); sdbUpdateAsync();
} }
...@@ -380,7 +406,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { ...@@ -380,7 +406,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) {
mnodeDecMnodeRef(pMnode); mnodeDecMnodeRef(pMnode);
} }
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet(NULL);
sdbUpdateAsync(); sdbUpdateAsync();
} }
...@@ -400,7 +426,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { ...@@ -400,7 +426,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
sdbDecRef(tsMnodeSdb, pMnode); sdbDecRef(tsMnodeSdb, pMnode);
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet(NULL);
sdbUpdateAsync(); sdbUpdateAsync();
return code; return code;
......
...@@ -224,11 +224,13 @@ void sdbUpdateMnodeRoles() { ...@@ -224,11 +224,13 @@ void sdbUpdateMnodeRoles() {
sdbInfo("vgId:1, mnode:%d, role:%s", pMnode->mnodeId, syncRole[pMnode->role]); sdbInfo("vgId:1, mnode:%d, role:%s", pMnode->mnodeId, syncRole[pMnode->role]);
if (pMnode->mnodeId == dnodeGetDnodeId()) tsSdbMgmt.role = pMnode->role; if (pMnode->mnodeId == dnodeGetDnodeId()) tsSdbMgmt.role = pMnode->role;
mnodeDecMnodeRef(pMnode); mnodeDecMnodeRef(pMnode);
} else {
sdbDebug("vgId:1, mnode:%d not found", roles.nodeId[i]);
} }
} }
mnodeUpdateClusterId(); mnodeUpdateClusterId();
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet(NULL);
} }
static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) {
...@@ -308,18 +310,20 @@ void sdbUpdateAsync() { ...@@ -308,18 +310,20 @@ void sdbUpdateAsync() {
} }
void sdbUpdateSync(void *pMnodes) { void sdbUpdateSync(void *pMnodes) {
SMnodeInfos *mnodes = pMnodes; SMInfos *pMinfos = pMnodes;
if (!mnodeIsRunning()) { if (!mnodeIsRunning()) {
mDebug("vgId:1, mnode not start yet, update sync config later"); mDebug("vgId:1, mnode not start yet, update sync config later");
return; return;
} }
mDebug("vgId:1, update sync config in sync module, mnodes:%p", pMnodes); mDebug("vgId:1, update sync config, pMnodes:%p", pMnodes);
SSyncCfg syncCfg = {0}; SSyncCfg syncCfg = {0};
int32_t index = 0; int32_t index = 0;
if (mnodes == NULL) { if (pMinfos == NULL) {
mDebug("vgId:1, mInfos not input, use mInfos in sdb, numOfMnodes:%d", syncCfg.replica);
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SMnodeObj *pMnode = NULL; SMnodeObj *pMnode = NULL;
...@@ -339,16 +343,17 @@ void sdbUpdateSync(void *pMnodes) { ...@@ -339,16 +343,17 @@ void sdbUpdateSync(void *pMnodes) {
mnodeDecMnodeRef(pMnode); mnodeDecMnodeRef(pMnode);
} }
syncCfg.replica = index; syncCfg.replica = index;
mDebug("vgId:1, mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica);
} else { } else {
for (index = 0; index < mnodes->mnodeNum; ++index) { mDebug("vgId:1, mInfos input, numOfMnodes:%d", syncCfg.replica);
SMnodeInfo *node = &mnodes->mnodeInfos[index];
for (index = 0; index < pMinfos->mnodeNum; ++index) {
SMInfo *node = &pMinfos->mnodeInfos[index];
syncCfg.nodeInfo[index].nodeId = node->mnodeId; syncCfg.nodeInfo[index].nodeId = node->mnodeId;
taosGetFqdnPortFromEp(node->mnodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort); taosGetFqdnPortFromEp(node->mnodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort);
syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC; syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC;
} }
syncCfg.replica = index; syncCfg.replica = index;
mDebug("vgId:1, mnodes info input, numOfMnodes:%d", syncCfg.replica); mnodeUpdateMnodeEpSet(pMnodes);
} }
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
...@@ -1103,3 +1108,7 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1103,3 +1108,7 @@ static void *sdbWorkerFp(void *pWorker) {
return NULL; return NULL;
} }
int32_t sdbGetReplicaNum() {
return tsSdbMgmt.cfg.replica;
}
\ No newline at end of file
...@@ -548,7 +548,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -548,7 +548,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->pSyncNode = pNode; pPeer->pSyncNode = pNode;
pPeer->refCount = 1; pPeer->refCount = 1;
sInfo("%s, it is configured", pPeer->id); sInfo("%s, it is configured, ep:%s:%u", pPeer->id, pPeer->fqdn, pPeer->port);
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
int32_t checkMs = 100 + (pNode->vgId * 10) % 100; int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册