提交 fa546638 编写于 作者: M Minghao Li

refactor(sync): add last config index

上级 4af9d144
......@@ -398,6 +398,8 @@ typedef struct SyncSnapshotSend {
SyncTerm term;
SyncIndex lastIndex; // lastIndex of snapshot
SyncTerm lastTerm; // lastTerm of snapshot
SyncIndex lastConfigIndex;
SSyncCfg lastConfig;
SyncTerm privateTerm;
int32_t seq;
uint32_t dataLen;
......
......@@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender {
void * pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
SSyncCfg lastConfig;
int64_t sendingMS;
SSyncNode *pSyncNode;
int32_t replicaIndex;
......
......@@ -326,7 +326,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
return ret;
}
#if 0
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
......
......@@ -190,15 +190,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender);
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld"
"sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s);
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, s);
taosMemoryFree(s);
} else {
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm);
"lastApplyTerm:%lu lastConfigIndex:%ld",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
}
}
......
......@@ -1201,13 +1201,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
*isDrop = true;
if (IamInOld && !IamInNew) {
*isDrop = true;
*isDrop = true;
} else {
*isDrop = false;
}
if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
}
raftCfgPersist(pSyncNode->pRaftCfg);
......
......@@ -16,6 +16,7 @@
#include "syncMessage.h"
#include "syncUtil.h"
#include "tcoding.h"
#include "syncRaftCfg.h"
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
......@@ -1846,6 +1847,10 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastConfigIndex);
cJSON_AddStringToObject(pRoot, "lastConfigIndex", u64buf);
cJSON_AddItemToObject(pRoot, "lastConfig", syncCfg2Json((SSyncCfg*)&(pMsg->lastConfig)));
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastTerm);
cJSON_AddStringToObject(pRoot, "lastTerm", u64buf);
......
......@@ -19,6 +19,7 @@
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncRaftCfg.h"
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
......@@ -83,6 +84,26 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
SSyncRaftEntry *pEntry = NULL;
int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex, &pEntry);
ASSERT(code == 0);
ASSERT(pEntry == NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SSyncCfg lastConfig;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
ASSERT(ret == 0);
pSender->lastConfig = lastConfig;
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
} else {
memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
}
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
......@@ -97,6 +118,8 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN
pMsg->privateTerm = pSender->privateTerm;
......@@ -112,15 +135,15 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
}
syncSnapshotSendDestroy(pMsg);
......@@ -228,6 +251,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq;
pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
......@@ -245,20 +270,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
}
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
}
syncSnapshotSendDestroy(pMsg);
......@@ -274,6 +299,8 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
......@@ -540,6 +567,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
bool isDrop;
syncNodeUpdateConfig(pSyncNode, &(pMsg->lastConfig), pMsg->lastConfigIndex, &isDrop);
}
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
......
......@@ -24,6 +24,15 @@ SyncSnapshotSend *createMsg() {
pMsg->privateTerm = 99;
pMsg->lastIndex = 22;
pMsg->lastTerm = 33;
pMsg->lastConfigIndex = 99;
pMsg->lastConfig.replicaNum = 3;
pMsg->lastConfig.myIndex = 1;
for (int i = 0; i < pMsg->lastConfig.replicaNum; ++i) {
((pMsg->lastConfig.nodeInfo)[i]).nodePort = i * 100;
snprintf(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn, sizeof(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
}
pMsg->seq = 44;
strcpy(pMsg->data, "hello world");
return pMsg;
......@@ -87,6 +96,9 @@ void test5() {
}
int main() {
gRaftDetailLog = true;
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册