提交 d5188f14 编写于 作者: M Minghao Li

sync refactor

上级 996886e7
...@@ -158,17 +158,17 @@ typedef struct SSyncNode SSyncNode; ...@@ -158,17 +158,17 @@ typedef struct SSyncNode SSyncNode;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo); int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
// propose with sequence number, to implement linearizable semantics // propose with sequence number, to implement linearizable semantics
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum); int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum);
// for compatibility, the same as syncPropose // for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
......
...@@ -155,102 +155,60 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -155,102 +155,60 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// accept request // accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
/* // preIndex = -1, or has preIndex entry in local log
bool preMatch = false; assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID &&
ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) {
preMatch = true;
}
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <=
ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore,
pMsg->prevLogIndex); assert(pPreEntry != NULL); if (pMsg->prevLogTerm == pPreEntry->term) { preMatch = true;
}
syncEntryDestory(pPreEntry);
}
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d, preMatch:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch);
if (preMatch) {
*/
{ // has extra entries (> preIndex) in local log
// preIndex = -1, or has preIndex entry in local log bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log // has entries in SyncAppendEntries msg
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); bool hasAppendEntries = pMsg->dataLen > 0;
// has entries in SyncAppendEntries msg sTrace(
bool hasAppendEntries = pMsg->dataLen > 0; "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
if (hasExtraEntries && hasAppendEntries) { pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
// not conflict by default
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// log not match, conflict if (hasExtraEntries && hasAppendEntries) {
assert(extraIndex == pAppendEntry->index); // not conflict by default
if (pExtraEntry->term != pAppendEntry->term) { bool conflict = false;
conflict = true;
}
if (conflict) { SyncIndex extraIndex = pMsg->prevLogIndex + 1;
// roll back SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex);
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); assert(pExtraEntry != NULL);
SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// notice! reverse roll back! // log not match, conflict
for (SyncIndex index = delEnd; index >= delBegin; --index) { assert(extraIndex == pAppendEntry->index);
if (ths->pFsm->FpRollBackCb != NULL) { if (pExtraEntry->term != pAppendEntry->term) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); conflict = true;
assert(pRollBackEntry != NULL); }
SRpcMsg rpcMsg; if (conflict) {
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); // roll back
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state); SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
rpcFreeCont(rpcMsg.pCont); SyncIndex delEnd = extraIndex;
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries // notice! reverse roll back!
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// pre commit SRpcMsg rpcMsg;
SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state);
if (ths->pFsm != NULL) { rpcFreeCont(rpcMsg.pCont);
if (ths->pFsm->FpPreCommitCb != NULL) { syncEntryDestory(pRollBackEntry);
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state);
}
} }
rpcFreeCont(rpcMsg.pCont);
} }
// free memory // delete confict entries
syncEntryDestory(pExtraEntry); ths->pLogStore->truncate(ths->pLogStore, extraIndex);
syncEntryDestory(pAppendEntry);
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// append new entries // append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
...@@ -260,55 +218,62 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -260,55 +218,62 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
}
// free memory // free memory
syncEntryDestory(pAppendEntry); syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) { } else if (hasExtraEntries && !hasAppendEntries) {
// do nothing // do nothing
} else { } else if (!hasExtraEntries && hasAppendEntries) {
assert(0); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
} assert(pAppendEntry != NULL);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); // append new entries
pReply->srcId = ths->myRaftId; ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
if (hasAppendEntries) { // pre commit
pReply->matchIndex = pMsg->prevLogIndex + 1; SRpcMsg rpcMsg;
} else { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
pReply->matchIndex = pMsg->prevLogIndex; if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state);
}
} }
rpcFreeCont(rpcMsg.pCont);
SRpcMsg rpcMsg; // free memory
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); syncEntryDestory(pAppendEntry);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply); } else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
assert(0);
} }
/* SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
else { pReply->srcId = ths->myRaftId;
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); pReply->destId = pMsg->srcId;
pReply->srcId = ths->myRaftId; pReply->term = ths->pRaftStore->currentTerm;
pReply->destId = pMsg->srcId; pReply->success = true;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false; if (hasAppendEntries) {
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
SRpcMsg rpcMsg; pReply->matchIndex = pMsg->prevLogIndex;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); }
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply); SRpcMsg rpcMsg;
} syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
*/ syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
// maybe update commit index from leader // maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) { if (pMsg->commitIndex > ths->commitIndex) {
......
...@@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
// update commit index // update commit index
newCommitIndex = index; newCommitIndex = index;
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
break; break;
} else {
sTrace(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
} }
} }
} }
......
...@@ -26,7 +26,7 @@ SSyncFSM* pFsm; ...@@ -26,7 +26,7 @@ SSyncFSM* pFsm;
SWal* pWal; SWal* pWal;
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
SSyncNode* syncNodeInit(const char *path) { SSyncNode* syncNodeInit(const char* path) {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
...@@ -78,9 +78,7 @@ SSyncNode* syncNodeInit(const char *path) { ...@@ -78,9 +78,7 @@ SSyncNode* syncNodeInit(const char *path) {
return pSyncNode; return pSyncNode;
} }
SSyncNode* logStoreCheck(const char *path) { return syncNodeInit(path); } SSyncNode* logStoreCheck(const char* path) { return syncNodeInit(path); }
int main(int argc, char** argv) { int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10); // taosInitLog((char *)"syncTest.log", 100000, 10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册