提交 0c8f62f7 编写于 作者: M Minghao Li

sync refactor

上级 57b4e7ce
......@@ -116,7 +116,7 @@ typedef struct SSyncNode {
SyncGroupId vgId;
SSyncCfg syncCfg;
char walPath[TSDB_FILENAME_LEN];
SWal* pWal;
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
......@@ -27,10 +27,14 @@ extern "C" {
#include "taosdef.h"
typedef struct SSyncRaftEntry {
SyncTerm term;
SyncIndex index;
SSyncBuffer data;
int8_t flag;
uint32_t bytes;
uint32_t msgType;
SyncTerm term;
SyncIndex index;
int8_t flag;
uint32_t dataLen;
char data[];
} SSyncRaftEntry;
#ifdef __cplusplus
......@@ -24,27 +24,42 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf);
typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode;
SWal* pWal;
} SSyncLogStoreData;
// get one log entry, user need to free pBuf->data
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf);
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
// update log store commit index with "index"
int32_t raftLogUpdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index);
void logStoreDestory(SSyncLogStore* pLogStore);
// truncate log with index, entries after the given index (>index) will be deleted
int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index);
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry);
// return commit index of log
SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore);
// get one log entry, user need to free pEntry->pCont
int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry);
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
// return index of last entry
SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore);
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
// return term of last entry
SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore);
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
#ifdef __cplusplus
......@@ -45,8 +45,8 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted);
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
cJSON * voteGranted2Json(SVotesGranted *pVotesGranted);
char * voteGranted2Str(SVotesGranted *pVotesGranted);
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted);
char *voteGranted2Str(SVotesGranted *pVotesGranted);
// SVotesRespond -----------------------------
typedef struct SVotesRespond {
......@@ -62,8 +62,8 @@ void votesRespondDestory(SVotesRespond *pVotesRespond);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
cJSON * votesRespond2Json(SVotesRespond *pVotesRespond);
char * votesRespond2Str(SVotesRespond *pVotesRespond);
cJSON *votesRespond2Json(SVotesRespond *pVotesRespond);
char *votesRespond2Str(SVotesRespond *pVotesRespond);
#ifdef __cplusplus
......@@ -18,8 +18,10 @@
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
#include "syncEnv.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
......@@ -78,7 +80,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->syncCfg = pSyncInfo->syncCfg;
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath));
pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->queue = pSyncInfo->queue;
......@@ -114,20 +116,26 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
// init life cycle
// init server vars
// init TLA+ server vars
pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath);
assert(pSyncNode->pRaftStore != NULL);
// init candidate vars
// init TLA+ candidate vars
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
assert(pSyncNode->pVotesGranted != NULL);
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
assert(pSyncNode->pVotesRespond != NULL);
// init leader vars
pSyncNode->pNextIndex = NULL;
pSyncNode->pMatchIndex = NULL;
// init TLA+ leader vars
pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
assert(pSyncNode->pNextIndex != NULL);
pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
assert(pSyncNode->pMatchIndex != NULL);
// init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
assert(pSyncNode->pLogStore != NULL);
pSyncNode->commitIndex = 0;
// init ping timer
pSyncNode->pPingTimer = NULL;
......@@ -177,7 +185,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// init by SSyncInfo
cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
cJSON_AddStringToObject(pRoot, "walPath", pSyncNode->walPath);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient);
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
......@@ -14,46 +14,61 @@
#include "syncRaftLog.h"
#include "wal.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; }
// get one log entry, user need to free pBuf->data
int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; }
// TLA+ Spec
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
// \* in part to minimize atomic regions, and in part so that leaders of
// \* single-server clusters are able to mark entries committed.
// AdvanceCommitIndex(i) ==
// /\ state[i] = Leader
// /\ LET \* The set of servers that agree up through index.
// Agree(index) == {i} \cup {k \in Server :
// matchIndex[i][k] >= index}
// \* The maximum indexes for which a quorum agrees
// agreeIndexes == {index \in 1..Len(log[i]) :
// Agree(index) \in Quorum}
// \* New value for commitIndex'[i]
// newCommitIndex ==
// IF /\ agreeIndexes /= {}
// /\ log[i][Max(agreeIndexes)].term = currentTerm[i]
// Max(agreeIndexes)
// commitIndex[i]
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; }
// truncate log with index, entries after the given index (>index) will be deleted
int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; }
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = malloc(sizeof(SSyncLogStore));
assert(pLogStore != NULL);
// return commit index of log
SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore) { return 0; }
pLogStore->data = malloc(sizeof(SSyncLogStoreData));
assert(pLogStore->data != NULL);
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
pData->pWal = pSyncNode->pWal;
pLogStore->appendEntry = logStoreAppendEntry;
pLogStore->getEntry = logStoreGetEntry;
pLogStore->truncate = logStoreTruncate;
pLogStore->getLastIndex = logStoreLastIndex;
pLogStore->getLastTerm = logStoreLastTerm;
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
pLogStore->getCommitIndex = logStoreGetCommitIndex;
void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) {
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry) {}
// get one log entry, user need to free pEntry->pCont
int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry) {}
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {}
// return index of last entry
SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore) { return 0; }
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {}
// return term of last entry
SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore) { return 0; }
\ No newline at end of file
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {}
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {}
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {}
cJSON* logStore2Json(SSyncLogStore* pLogStore) {}
char* logStore2Str(SSyncLogStore* pLogStore) {
cJSON* pJson = logStore2Json(pLogStore);
char* serialized = cJSON_Print(pJson);
return serialized;
\ No newline at end of file
......@@ -34,7 +34,6 @@ SSyncNode* syncNodeInit() {
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
......@@ -31,7 +31,6 @@ SSyncNode* syncNodeInit() {
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
......@@ -26,7 +26,6 @@ SSyncNode* doSync(int myIndex) {
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./wal_path");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
......@@ -33,7 +33,6 @@ SSyncNode* syncNodeInit() {
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
......@@ -33,7 +33,6 @@ SSyncNode* syncNodeInit() {
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册