提交 fda9803c 编写于 作者: M Minghao Li

sync refactor

上级 f22226a4
......@@ -102,7 +102,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL);
localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry);
......@@ -111,9 +111,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
bool logOK =
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogIndex == localPreLogTerm));
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
// reject
// reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
......@@ -134,6 +134,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
syncNodeBecomeFollower(ths);
// need ret?
return ret;
}
// accept request
......@@ -144,17 +147,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
matchSuccess = true;
}
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogTerm);
assert(pEntry != NULL);
if (pMsg->prevLogTerm == pEntry->term) {
SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pPreEntry != NULL);
if (pMsg->prevLogTerm == pPreEntry->term) {
matchSuccess = true;
}
syncEntryDestory(pEntry);
syncEntryDestory(pPreEntry);
}
if (matchSuccess) {
// delete conflict entries
if (ths->pLogStore->getLastIndex(ths->pLogStore) > pMsg->prevLogIndex) {
if (pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore)) {
SyncIndex fromIndex = pMsg->prevLogIndex + 1;
ths->pLogStore->truncate(ths->pLogStore, fromIndex);
}
......@@ -178,6 +181,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
} else {
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncIndexMgr.h"
#include "syncInt.h"
// \* Leader i advances its commitIndex.
......@@ -38,4 +39,7 @@
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {}
\ No newline at end of file
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
}
\ No newline at end of file
......@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static void * syncIOConsumerFunc(void *param);
static void *syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
......@@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param;
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall();
while (1) {
......@@ -269,6 +269,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
if (io->FpOnSyncPingReply != NULL) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
}
......@@ -276,6 +277,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
if (io->FpOnSyncClientRequest != NULL) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
}
......@@ -283,6 +285,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote != NULL) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
}
......@@ -290,6 +293,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
}
......@@ -297,6 +301,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries != NULL) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
}
......@@ -304,6 +309,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply != NULL) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
}
......@@ -311,6 +317,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
}
......
......@@ -563,11 +563,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
// stop elect timer
syncNodeStopElectTimer(pSyncNode);
// start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode);
// start replicate right now!
syncNodeReplicate(pSyncNode);
// start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode);
}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
......
......@@ -63,15 +63,18 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SSyncRaftEntry* pEntry;
SSyncRaftEntry* pEntry = NULL;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
walReadWithHandle(pWalHandle, index);
pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
assert(pEntry != NULL);
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
walReadWithHandle(pWalHandle, index);
pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
assert(pEntry != NULL);
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
}
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
return pEntry;
}
......
......@@ -17,7 +17,7 @@ void logTest() {
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
......@@ -33,7 +33,7 @@ SSyncNode* syncNodeInit() {
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./elect_test");
snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect_test_%d", myIndex);
int code = walInit();
assert(code == 0);
......@@ -46,7 +46,10 @@ SSyncNode* syncNodeInit() {
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen("./elect_test_wal", &walCfg);
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./elect_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
......
......@@ -81,7 +81,7 @@ SSyncNode* syncNodeInit() {
SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
logStorePrint2((char*)"logStoreTest2", pSyncNode->pLogStore);
logStorePrint2((char*)"logStoreTest", pSyncNode->pLogStore);
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
......@@ -105,10 +105,10 @@ void logStoreTest() {
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
}
}
logStorePrint(pSyncNode->pLogStore);
logStorePrint2((char*)"after appendEntry", pSyncNode->pLogStore);
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
logStorePrint(pSyncNode->pLogStore);
logStorePrint2((char*)"after truncate 3", pSyncNode->pLogStore);
}
void initRaftId(SSyncNode* pSyncNode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册