提交 2071c5a7 编写于 作者: M Minghao Li

fix(sync): sending snapshot

上级 0d7272a3
......@@ -451,9 +451,11 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
return false;
}
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry) {
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry,
bool* pEntryAlreadyWritten) {
int32_t code;
*ppAppendEntry = NULL;
*pEntryAlreadyWritten = false;
// not conflict by default
bool conflict = false;
......@@ -466,10 +468,15 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyn
*ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ASSERT(*ppAppendEntry != NULL);
// log not match, conflict, need delete
ASSERT(extraIndex == (*ppAppendEntry)->index);
if (pExtraEntry->term != (*ppAppendEntry)->term) {
// log not match, conflict, need delete
conflict = true;
} else {
// log match, already written
ASSERT(extraIndex == (*ppAppendEntry)->index && pExtraEntry->term == (*ppAppendEntry)->term);
*pEntryAlreadyWritten = true;
sInfo("entry already written, term:%lu, index:%ld", pExtraEntry->term, pExtraEntry->index);
}
syncEntryDestory(pExtraEntry);
......@@ -606,17 +613,20 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if (hasExtraEntries && hasAppendEntries) {
// make log same
SSyncRaftEntry* pAppendEntry;
code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry);
bool entryAlreadyWritten;
code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry, &entryAlreadyWritten);
ASSERT(code == 0);
ASSERT(pAppendEntry != NULL);
// append new entries
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
if (!entryAlreadyWritten) {
// append new entries
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
}
syncEntryDestory(pAppendEntry);
......
......@@ -1612,7 +1612,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
int32_t ret = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
SyncTerm term = ths->pRaftStore->currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
assert(pEntry != NULL);
......
......@@ -477,13 +477,20 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count);
}
cJSON* pJson = cJSON_CreateObject();
......
......@@ -14,6 +14,7 @@
*/
#include "syncSnapshot.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
......@@ -284,7 +285,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -398,7 +399,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -427,8 +428,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sInfo("snapshot receive finish, update log begin index:%ld, raft log:%s", pMsg->lastIndex + 1, logSimpleStr);
taosMemoryFree(logSimpleStr);
// walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
// sInfo("walRestoreFromSnapshot lastIndex:%ld", pMsg->lastIndex);
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver);
......
......@@ -295,7 +295,7 @@ int main(int argc, char** argv) {
int32_t lastApplyTerm = atoi(argv[5]);
int32_t writeRecordNum = atoi(argv[6]);
bool isStandBy = atoi(argv[7]);
bool isConfigChange = atoi(argv[8]);
int32_t isConfigChange = atoi(argv[8]);
int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]);
......@@ -308,7 +308,7 @@ int main(int argc, char** argv) {
// check parameter
assert(replicaNum >= 1 && replicaNum <= 5);
assert(myIndex >= 0 && myIndex < replicaNum);
//assert(myIndex >= 0 && myIndex < replicaNum);
assert(lastApplyIndex >= -1);
assert(lastApplyTerm >= 0);
assert(writeRecordNum >= 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册