提交 2ca5bdc7 编写于 作者: M Minghao Li

refactor(sync): add syncNodeDynamicQuorum

上级 eee4c085
...@@ -26,12 +26,14 @@ extern "C" { ...@@ -26,12 +26,14 @@ extern "C" {
extern bool gRaftDetailLog; extern bool gRaftDetailLog;
#define SYNC_RESP_TTL_MS 10000000 #define SYNC_RESP_TTL_MS 10000000
#define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) #define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100 #define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_MAX_READ_RANGE 10 #define SYNC_MAX_READ_RANGE 2
#define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1000
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
......
...@@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply { ...@@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply {
SyncTerm privateTerm; SyncTerm privateTerm;
bool success; bool success;
SyncIndex matchIndex; SyncIndex matchIndex;
int64_t startTime;
} SyncAppendEntriesReply; } SyncAppendEntriesReply;
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId);
......
...@@ -29,8 +29,12 @@ extern "C" { ...@@ -29,8 +29,12 @@ extern "C" {
// SIndexMgr ----------------------------- // SIndexMgr -----------------------------
typedef struct SSyncIndexMgr { typedef struct SSyncIndexMgr {
SRaftId (*replicas)[TSDB_MAX_REPLICA]; SRaftId (*replicas)[TSDB_MAX_REPLICA];
SyncIndex index[TSDB_MAX_REPLICA]; SyncIndex index[TSDB_MAX_REPLICA];
SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function
int64_t startTimeArr[TSDB_MAX_REPLICA];
int64_t recvTimeArr[TSDB_MAX_REPLICA];
int32_t replicaNum; int32_t replicaNum;
SSyncNode *pSyncNode; SSyncNode *pSyncNode;
} SSyncIndexMgr; } SSyncIndexMgr;
...@@ -41,8 +45,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); ...@@ -41,8 +45,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime);
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); // void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); // SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
......
...@@ -269,6 +269,8 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); ...@@ -269,6 +269,8 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
// trace log // trace log
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
......
...@@ -148,6 +148,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -148,6 +148,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false; pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
...@@ -290,6 +291,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -290,6 +291,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->matchIndex = pMsg->prevLogIndex; pReply->matchIndex = pMsg->prevLogIndex;
} }
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
...@@ -603,6 +606,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -603,6 +606,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true; pReply->success = true;
pReply->matchIndex = matchIndex; pReply->matchIndex = matchIndex;
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
...@@ -651,6 +655,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -651,6 +655,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = false; pReply->success = false;
pReply->matchIndex = ths->commitIndex; pReply->matchIndex = ths->commitIndex;
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
...@@ -729,6 +734,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -729,6 +734,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true; pReply->success = true;
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
...@@ -874,6 +880,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -874,6 +880,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true; pReply->success = true;
pReply->matchIndex = matchIndex; pReply->matchIndex = matchIndex;
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
...@@ -919,6 +926,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -919,6 +926,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = false; pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
...@@ -984,6 +992,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -984,6 +992,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true; pReply->success = true;
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex;
pReply->startTime = ths->startTime;
// msg event log // msg event log
syncLogSendAppendEntriesReply(ths, pReply, ""); syncLogSendAppendEntriesReply(ths, pReply, "");
......
...@@ -64,6 +64,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -64,6 +64,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// update time
syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime);
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs());
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
...@@ -170,6 +174,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -170,6 +174,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// update time
syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime);
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs());
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
...@@ -330,6 +338,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -330,6 +338,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// update time
syncIndexMgrSetStartTime(ths->pNextIndex, &(pMsg->srcId), pMsg->startTime);
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), taosGetTimestampMs());
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
......
...@@ -133,6 +133,63 @@ bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) { ...@@ -133,6 +133,63 @@ bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
return false; return false;
} }
static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
ASSERT(a >= 0);
ASSERT(b >= 0);
int64_t c = a > b ? a - b : b - a;
return c;
}
int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
int32_t quorum = 1; // self
int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncNode->peersNum; ++i) {
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int32_t addQuorum = 0;
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1;
} else {
addQuorum = 0;
}
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0;
}
quorum += addQuorum;
}
ASSERT(quorum <= pSyncNode->replicaNum);
if (quorum < pSyncNode->quorum) {
quorum = pSyncNode->quorum;
}
return quorum;
}
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
int agreeCount = 0;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) {
++agreeCount;
}
if (agreeCount >= syncNodeDynamicQuorum(pSyncNode)) {
return true;
}
}
return false;
}
/*
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
int agreeCount = 0; int agreeCount = 0;
for (int i = 0; i < pSyncNode->replicaNum; ++i) { for (int i = 0; i < pSyncNode->replicaNum; ++i) {
...@@ -145,3 +202,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { ...@@ -145,3 +202,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
} }
return false; return false;
} }
*/
...@@ -47,6 +47,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { ...@@ -47,6 +47,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) { void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) {
memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index)); memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index));
memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm)); memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm));
// int64_t timeNow = taosGetMonotonicMs();
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
pSyncIndexMgr->startTimeArr[i] = 0;
pSyncIndexMgr->recvTimeArr[i] = 0;
}
/* /*
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
pSyncIndexMgr->index[i] = 0; pSyncIndexMgr->index[i] = 0;
...@@ -68,7 +75,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, ...@@ -68,7 +75,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, index); sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
index);
} }
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
...@@ -125,11 +133,65 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { ...@@ -125,11 +133,65 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
(pSyncIndexMgr->startTimeArr)[i] = startTime;
return;
}
}
// maybe config change
// ASSERT(0);
char host[128];
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
startTime);
}
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
int64_t startTime = (pSyncIndexMgr->startTimeArr)[i];
return startTime;
}
}
ASSERT(0);
}
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
(pSyncIndexMgr->recvTimeArr)[i] = recvTime;
return;
}
}
// maybe config change
// ASSERT(0);
char host[128];
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port,
recvTime);
}
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i];
return recvTime;
}
}
ASSERT(0);
}
// for debug ------------------- // for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) { void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj); char *serialized = syncIndexMgr2Str(pObj);
......
...@@ -1682,13 +1682,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1682,13 +1682,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
", sby:%d, " ", sby:%d, "
"stgy:%d, bch:%d, " "stgy:%d, bch:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser,
printStr); pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else { } else {
snprintf(logBuf, sizeof(logBuf), "%s", str); snprintf(logBuf, sizeof(logBuf), "%s", str);
} }
...@@ -1706,12 +1706,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1706,12 +1706,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
", sby:%d, " ", sby:%d, "
"stgy:%d, bch:%d, " "stgy:%d, bch:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, printStr); pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser,
pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else { } else {
snprintf(s, len, "%s", str); snprintf(s, len, "%s", str);
} }
......
...@@ -1947,6 +1947,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { ...@@ -1947,6 +1947,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
cJSON_AddNumberToObject(pRoot, "success", pMsg->success); cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex);
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
......
...@@ -24,6 +24,7 @@ SyncAppendEntriesReply *createMsg() { ...@@ -24,6 +24,7 @@ SyncAppendEntriesReply *createMsg() {
pMsg->matchIndex = 77; pMsg->matchIndex = 77;
pMsg->term = 33; pMsg->term = 33;
pMsg->privateTerm = 44; pMsg->privateTerm = 44;
pMsg->startTime = taosGetTimestampMs();
return pMsg; return pMsg;
} }
...@@ -89,6 +90,8 @@ void test5() { ...@@ -89,6 +90,8 @@ void test5() {
} }
int main() { int main() {
gRaftDetailLog = true;
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest(); logTest();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册