提交 b638d0ef 编写于 作者: S Shengliang Guan

refactor: let mnode report sync state

上级 4bb91775
...@@ -950,6 +950,7 @@ typedef struct { ...@@ -950,6 +950,7 @@ typedef struct {
int32_t numOfCores; int32_t numOfCores;
int32_t numOfSupportVnodes; int32_t numOfSupportVnodes;
char dnodeEp[TSDB_EP_LEN]; char dnodeEp[TSDB_EP_LEN];
SMnodeLoad mload;
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad SArray* pVloads; // array of SVnodeLoad
} SStatusReq; } SStatusReq;
......
...@@ -29,6 +29,7 @@ extern "C" { ...@@ -29,6 +29,7 @@ extern "C" {
typedef struct SMnode SMnode; typedef struct SMnode SMnode;
typedef struct { typedef struct {
int32_t dnodeId;
bool standby; bool standby;
bool deploy; bool deploy;
int8_t replica; int8_t replica;
......
...@@ -891,6 +891,9 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -891,6 +891,9 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1; if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
} }
// mnode loads
if (tEncodeI32(&encoder, pReq->mload.syncState) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -946,6 +949,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -946,6 +949,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
} }
} }
if (tDecodeI32(&decoder, &pReq->mload.syncState) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
......
...@@ -75,8 +75,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -75,8 +75,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(*pMgmt->getVnodeLoadsFp)(&vinfo); (*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads; req.pVloads = vinfo.pVloads;
SMonMloadInfo minfo = {0}; SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo); (*pMgmt->getMnodeLoadsFp)(&minfo);
req.mload = minfo.load;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
......
...@@ -42,6 +42,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu ...@@ -42,6 +42,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
pOption->standby = false; pOption->standby = false;
pOption->deploy = true; pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
pOption->replica = 1; pOption->replica = 1;
pOption->selfIndex = 0; pOption->selfIndex = 0;
...@@ -52,9 +54,10 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu ...@@ -52,9 +54,10 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
} }
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
pOption->msgCb = pMgmt->msgCb;
pOption->deploy = false; pOption->deploy = false;
pOption->standby = false; pOption->standby = false;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
if (pMgmt->replica > 0) { if (pMgmt->replica > 0) {
pOption->standby = true; pOption->standby = true;
...@@ -71,9 +74,11 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -71,9 +74,11 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
} }
static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
pOption->msgCb = pMgmt->msgCb;
pOption->standby = false; pOption->standby = false;
pOption->deploy = false; pOption->deploy = false;
pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId;
pOption->replica = pCreate->replica; pOption->replica = pCreate->replica;
pOption->selfIndex = -1; pOption->selfIndex = -1;
......
...@@ -90,8 +90,8 @@ typedef enum { ...@@ -90,8 +90,8 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef void (*SendMonitorReportFp)(); typedef void (*SendMonitorReportFp)();
typedef void (*GetVnodeLoadsFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
typedef void (*GetMnodeLoadsFp)(); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
......
...@@ -199,9 +199,8 @@ typedef struct { ...@@ -199,9 +199,8 @@ typedef struct {
int32_t id; int32_t id;
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
ESyncState role; ESyncState state;
int32_t roleTerm; int64_t stateStartTime;
int64_t roleTime;
SDnodeObj* pDnode; SDnodeObj* pDnode;
} SMnodeObj; } SMnodeObj;
......
...@@ -78,7 +78,6 @@ typedef struct { ...@@ -78,7 +78,6 @@ typedef struct {
SWal *pWal; SWal *pWal;
sem_t syncSem; sem_t syncSem;
int64_t sync; int64_t sync;
ESyncState state;
bool standby; bool standby;
bool restored; bool restored;
int32_t errCode; int32_t errCode;
...@@ -90,7 +89,7 @@ typedef struct { ...@@ -90,7 +89,7 @@ typedef struct {
} SGrantInfo; } SGrantInfo;
typedef struct SMnode { typedef struct SMnode {
int32_t selfId; int32_t selfDnodeId;
int64_t clusterId; int64_t clusterId;
TdThread thread; TdThread thread;
bool deploy; bool deploy;
......
...@@ -28,7 +28,6 @@ SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId); ...@@ -28,7 +28,6 @@ SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId);
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj); void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj);
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
void mndUpdateMnodeRole(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -58,14 +58,16 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -58,14 +58,16 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
int32_t mndInitDnode(SMnode *pMnode) { int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DNODE, SSdbTable table = {
.keyType = SDB_KEY_INT32, .sdbType = SDB_DNODE,
.deployFp = (SdbDeployFp)mndCreateDefaultDnode, .keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndDnodeActionEncode, .deployFp = (SdbDeployFp)mndCreateDefaultDnode,
.decodeFp = (SdbDecodeFp)mndDnodeActionDecode, .encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
.insertFp = (SdbInsertFp)mndDnodeActionInsert, .decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
.updateFp = (SdbUpdateFp)mndDnodeActionUpdate, .insertFp = (SdbInsertFp)mndDnodeActionInsert,
.deleteFp = (SdbDeleteFp)mndDnodeActionDelete}; .updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndDnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq);
...@@ -377,6 +379,15 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -377,6 +379,15 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseVgroup(pMnode, pVgroup); mndReleaseVgroup(pMnode, pVgroup);
} }
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) {
if (pObj->state != statusReq.mload.syncState) {
pObj->state = statusReq.mload.syncState;
pObj->stateStartTime = taosGetTimestampMs();
}
mndReleaseMnode(pMnode, pObj);
}
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
......
...@@ -77,28 +77,6 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) { ...@@ -77,28 +77,6 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
sdbRelease(pMnode->pSdb, pObj); sdbRelease(pMnode->pSdb, pObj);
} }
void mndUpdateMnodeRole(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
ESyncState lastRole = pObj->role;
if (pObj->id == 1) {
pObj->role = TAOS_SYNC_STATE_LEADER;
} else {
pObj->role = TAOS_SYNC_STATE_CANDIDATE;
}
if (pObj->role != lastRole) {
pObj->roleTime = taosGetTimestampMs();
}
sdbRelease(pSdb, pObj);
}
}
static int32_t mndCreateDefaultMnode(SMnode *pMnode) { static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
SMnodeObj mnodeObj = {0}; SMnodeObj mnodeObj = {0};
mnodeObj.id = 1; mnodeObj.id = 1;
...@@ -209,7 +187,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { ...@@ -209,7 +187,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
return -1; return -1;
} }
pObj->role = TAOS_SYNC_STATE_FOLLOWER; pObj->state = TAOS_SYNC_STATE_ERROR;
return 0; return 0;
} }
...@@ -253,7 +231,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { ...@@ -253,7 +231,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if (pObj->pDnode == NULL) { if (pObj->pDnode == NULL) {
mError("mnode:%d, no corresponding dnode exists", pObj->id); mError("mnode:%d, no corresponding dnode exists", pObj->id);
} else { } else {
if (pObj->role == TAOS_SYNC_STATE_LEADER) { if (pObj->state == TAOS_SYNC_STATE_LEADER) {
pEpSet->inUse = pEpSet->numOfEps; pEpSet->inUse = pEpSet->numOfEps;
} }
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
...@@ -581,7 +559,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) { ...@@ -581,7 +559,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (pMnode->selfId == dropReq.dnodeId) { if (pMnode->selfDnodeId == dropReq.dnodeId) {
terrno = TSDB_CODE_MND_CANT_DROP_MASTER; terrno = TSDB_CODE_MND_CANT_DROP_MASTER;
goto _OVER; goto _OVER;
} }
...@@ -652,10 +630,11 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -652,10 +630,11 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false); colDataAppend(pColInfo, numOfRows, b1, false);
// const char *roles = syncStr(syncGetMyRole(pMnode->syncMgmt.sync)); const char *roles = NULL;
const char *roles = syncStr(TAOS_SYNC_STATE_FOLLOWER); if (pObj->id == pMnode->selfDnodeId) {
if (pObj->id == pMnode->selfId) {
roles = syncStr(TAOS_SYNC_STATE_LEADER); roles = syncStr(TAOS_SYNC_STATE_LEADER);
} else {
roles = syncStr(pObj->state);
} }
char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE); char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
......
...@@ -196,9 +196,8 @@ void mndSyncStop(SMnode *pMnode) {} ...@@ -196,9 +196,8 @@ void mndSyncStop(SMnode *pMnode) {}
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->state = syncGetMyRole(pMgmt->sync); ESyncState state = syncGetMyRole(pMgmt->sync);
return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
} }
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
......
...@@ -264,7 +264,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -264,7 +264,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->selfIndex = pOption->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb; pMnode->msgCb = pOption->msgCb;
pMnode->selfId = pOption->replicas[pOption->selfIndex].id; pMnode->selfDnodeId = pOption->dnodeId;
pMnode->syncMgmt.standby = pOption->standby; pMnode->syncMgmt.standby = pOption->standby;
} }
...@@ -318,7 +318,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { ...@@ -318,7 +318,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL; return NULL;
} }
mndUpdateMnodeRole(pMnode);
mDebug("mnode open successfully "); mDebug("mnode open successfully ");
return pMnode; return pMnode;
} }
...@@ -519,16 +518,17 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -519,16 +518,17 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonMnodeDesc desc = {0}; SMonMnodeDesc desc = {0};
desc.mnode_id = pObj->id; desc.mnode_id = pObj->id;
tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep)); tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
// tstrncpy(desc.role, syncStr(pObj->role), sizeof(desc.role));
taosArrayPush(pClusterInfo->mnodes, &desc);
sdbRelease(pSdb, pObj);
if (pObj->id == pMnode->selfId) { if (pObj->id == pMnode->selfDnodeId) {
pClusterInfo->first_ep_dnode_id = pObj->id; pClusterInfo->first_ep_dnode_id = pObj->id;
tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep)); tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f); pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
} else {
tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
} }
taosArrayPush(pClusterInfo->mnodes, &desc);
sdbRelease(pSdb, pObj);
} }
// vgroup info // vgroup info
...@@ -581,6 +581,6 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -581,6 +581,6 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
} }
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
pLoad->syncState = pMnode->syncMgmt.state; pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册