未验证 提交 aa691d13 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #13096 from taosdata/feature/3.0_mhli

enh(sync) snapshot sender, receiver
...@@ -55,6 +55,8 @@ typedef struct SVotesRespond SVotesRespond; ...@@ -55,6 +55,8 @@ typedef struct SVotesRespond SVotesRespond;
typedef struct SSyncIndexMgr SSyncIndexMgr; typedef struct SSyncIndexMgr SSyncIndexMgr;
typedef struct SRaftCfg SRaftCfg; typedef struct SRaftCfg SRaftCfg;
typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo // init by SSyncInfo
...@@ -148,9 +150,11 @@ typedef struct SSyncNode { ...@@ -148,9 +150,11 @@ typedef struct SSyncNode {
SSyncRespMgr* pSyncRespMgr; SSyncRespMgr* pSyncRespMgr;
// restore state // restore state
bool restoreFinish;
// sem_t restoreSem; // sem_t restoreSem;
SSnapshot* pSnapshot; bool restoreFinish;
SSnapshot* pSnapshot;
SSyncSnapshotSender* pSender;
SSyncSnapshotReceiver* pReceiver;
} SSyncNode; } SSyncNode;
......
...@@ -32,20 +32,21 @@ typedef struct SSyncLogStoreData { ...@@ -32,20 +32,21 @@ typedef struct SSyncLogStoreData {
SWal* pWal; SWal* pWal;
} SSyncLogStoreData; } SSyncLogStoreData;
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore); void logStoreDestory(SSyncLogStore* pLogStore);
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); cJSON* logStore2Json(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); char* logStore2Str(SSyncLogStore* pLogStore);
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore);
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); char* logStoreSimple2Str(SSyncLogStore* pLogStore);
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); // SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); // SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); // SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore); // SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
char* logStore2Str(SSyncLogStore* pLogStore); // int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); // int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
char* logStoreSimple2Str(SSyncLogStore* pLogStore); // int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
// SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
// for debug // for debug
void logStorePrint(SSyncLogStore* pLogStore); void logStorePrint(SSyncLogStore* pLogStore);
......
...@@ -23,11 +23,39 @@ extern "C" { ...@@ -23,11 +23,39 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); typedef struct SSyncSnapshotSender {
int32_t restoreSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); bool isStart;
int32_t progressIndex;
void * pCurrentBlock;
int32_t len;
SSnapshot *pSnapshot;
SSyncNode *pSyncNode;
} SSyncSnapshotSender;
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool isStart;
int32_t progressIndex;
void * pCurrentBlock;
int32_t len;
SSnapshot *pSnapshot;
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver;
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -107,7 +107,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -107,7 +107,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncTerm localPreLogTerm = 0; SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL); assert(pEntry != NULL);
localPreLogTerm = pEntry->term; localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
...@@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
bool conflict = false; bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1; SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL); assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
...@@ -197,7 +197,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -197,7 +197,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// notice! reverse roll back! // notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) { for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) { if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL); assert(pRollBackEntry != NULL);
// if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
...@@ -365,7 +365,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -365,7 +365,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} }
SReConfigCbMeta cbMeta = {0}; SReConfigCbMeta cbMeta = {0};
bool isDrop; bool isDrop;
// I am in newConfig // I am in newConfig
if (hit) { if (hit) {
...@@ -388,7 +388,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -388,7 +388,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} }
// always call FpReConfigCb // always call FpReConfigCb
if (ths->pFsm->FpReConfigCb != NULL) { if (ths->pFsm->FpReConfigCb != NULL) {
cbMeta.code = 0; cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
......
...@@ -16,6 +16,15 @@ ...@@ -16,6 +16,15 @@
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "wal.h" #include "wal.h"
static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
static SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
static int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
assert(pLogStore != NULL); assert(pLogStore != NULL);
...@@ -78,7 +87,9 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -78,7 +87,9 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
int32_t code = walReadWithHandle(pWalHandle, index); ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
......
...@@ -75,7 +75,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -75,7 +75,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
SyncAppendEntries* pMsg = NULL; SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) { if (pEntry != NULL) {
pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
assert(pMsg != NULL); assert(pMsg != NULL);
......
...@@ -15,6 +15,22 @@ ...@@ -15,6 +15,22 @@
#include "syncSnapshot.h" #include "syncSnapshot.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode) { return NULL; }
int32_t restoreSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {}
\ No newline at end of file
int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; }
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { return NULL; }
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { return NULL; }
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; }
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {}
int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; }
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; }
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; }
...@@ -78,7 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { ...@@ -78,7 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
} }
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册