diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c
index 87d6669f59c33a482af03aee58e793a7c30a4f38..888d7e16d1152c2a7d9de3718277041e46774eaa 100644
--- a/source/libs/sync/src/syncAppendEntries.c
+++ b/source/libs/sync/src/syncAppendEntries.c
@@ -102,7 +102,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
- SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
+ SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL);
localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry);
@@ -111,9 +111,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
bool logOK =
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
- (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogIndex == localPreLogTerm));
+ (pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
- // reject
+ // reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
@@ -134,6 +134,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
syncNodeBecomeFollower(ths);
+
+ // need ret?
+ return ret;
}
// accept request
@@ -144,17 +147,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
matchSuccess = true;
}
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
- SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
- assert(pEntry != NULL);
- if (pMsg->prevLogTerm == pEntry->term) {
+ SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
+ assert(pPreEntry != NULL);
+ if (pMsg->prevLogTerm == pPreEntry->term) {
matchSuccess = true;
}
- syncEntryDestory(pEntry);
+ syncEntryDestory(pPreEntry);
}
if (matchSuccess) {
// delete conflict entries
- if (ths->pLogStore->getLastIndex(ths->pLogStore) > pMsg->prevLogIndex) {
+ if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) {
SyncIndex fromIndex = pMsg->prevLogIndex + 1;
ths->pLogStore->truncate(ths->pLogStore, fromIndex);
}
@@ -178,6 +181,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
+
} else {
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c
index ec4272fdd0c956125f5c1114a4ad02414524bc04..850468f393a6b1ba0e2b3aab6a9dd23eaeaeb4f3 100644
--- a/source/libs/sync/src/syncCommit.c
+++ b/source/libs/sync/src/syncCommit.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include "syncIndexMgr.h"
#include "syncInt.h"
// \* Leader i advances its commitIndex.
@@ -38,4 +39,7 @@
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <>
//
-void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {}
\ No newline at end of file
+void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
+ syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
+ syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
+}
\ No newline at end of file
diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c
index e6a43f2bb50bfc898b214ab8be77d4ce71e3a365..c8448c32eb08d3733e9b8e82d6f57ec395d99829 100644
--- a/source/libs/sync/src/syncIO.c
+++ b/source/libs/sync/src/syncIO.c
@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
-static void * syncIOConsumerFunc(void *param);
+static void *syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
@@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
- SSyncIO * io = param;
+ SSyncIO *io = param;
STaosQall *qall;
- SRpcMsg * pRpcMsg, rpcMsg;
+ SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall();
while (1) {
@@ -269,6 +269,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
if (io->FpOnSyncPingReply != NULL) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
}
@@ -276,6 +277,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
if (io->FpOnSyncClientRequest != NULL) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
}
@@ -283,6 +285,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote != NULL) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
}
@@ -290,6 +293,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
}
@@ -297,6 +301,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries != NULL) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
}
@@ -304,6 +309,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply != NULL) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
}
@@ -311,6 +317,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
}
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index 7d2087ededfb2fd4b7827a91821b1e4fe269c81b..ff446dfc272bc2715eb872b302afe462994a5f0b 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -563,11 +563,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
// stop elect timer
syncNodeStopElectTimer(pSyncNode);
- // start heartbeat timer
- syncNodeStartHeartbeatTimer(pSyncNode);
-
// start replicate right now!
syncNodeReplicate(pSyncNode);
+
+ // start heartbeat timer
+ syncNodeStartHeartbeatTimer(pSyncNode);
}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c
index f3fd7f84a2d475fe11cba41ffbe5a5c2d4ec3bbc..01ec761a9c3930ddaa7c743f073b8e4520b9abc9 100644
--- a/source/libs/sync/src/syncRaftLog.c
+++ b/source/libs/sync/src/syncRaftLog.c
@@ -63,15 +63,18 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
- SSyncRaftEntry* pEntry;
+ SSyncRaftEntry* pEntry = NULL;
- SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
- walReadWithHandle(pWalHandle, index);
- pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
- assert(pEntry != NULL);
+ if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
+ SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
+ walReadWithHandle(pWalHandle, index);
+ pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
+ assert(pEntry != NULL);
+
+ // need to hold, do not new every time!!
+ walCloseReadHandle(pWalHandle);
+ }
- // need to hold, do not new every time!!
- walCloseReadHandle(pWalHandle);
return pEntry;
}
diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp
index e52fe61ef5e68719326400d29311f8c60b12603a..b8a1460f3555460c58f98b126b5672f38d80ff5a 100644
--- a/source/libs/sync/test/syncElectTest.cpp
+++ b/source/libs/sync/test/syncElectTest.cpp
@@ -17,7 +17,7 @@ void logTest() {
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
-int32_t replicaNum = 1;
+int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
@@ -33,7 +33,7 @@ SSyncNode* syncNodeInit() {
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
- snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./elect_test");
+ snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect_test_%d", myIndex);
int code = walInit();
assert(code == 0);
@@ -46,7 +46,10 @@ SSyncNode* syncNodeInit() {
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
- pWal = walOpen("./elect_test_wal", &walCfg);
+
+ char tmpdir[128];
+ snprintf(tmpdir, sizeof(tmpdir), "./elect_test_wal_%d", myIndex);
+ pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp
index 1b05f76fa2971357c923c3c43ff66a9f1ebea759..c1cb66f382574f042bf910308a5da5e2d1a1b0b1 100644
--- a/source/libs/sync/test/syncLogStoreTest.cpp
+++ b/source/libs/sync/test/syncLogStoreTest.cpp
@@ -81,7 +81,7 @@ SSyncNode* syncNodeInit() {
SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
- logStorePrint2((char*)"logStoreTest2", pSyncNode->pLogStore);
+ logStorePrint2((char*)"logStoreTest", pSyncNode->pLogStore);
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
@@ -105,10 +105,10 @@ void logStoreTest() {
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
}
}
- logStorePrint(pSyncNode->pLogStore);
+ logStorePrint2((char*)"after appendEntry", pSyncNode->pLogStore);
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
- logStorePrint(pSyncNode->pLogStore);
+ logStorePrint2((char*)"after truncate 3", pSyncNode->pLogStore);
}
void initRaftId(SSyncNode* pSyncNode) {