diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2e71745f61eff205143581b322b3ee1032524361..4100aa021672602bd55738067febe80db7790e11 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -55,6 +55,8 @@ typedef struct SVotesRespond SVotesRespond; typedef struct SSyncIndexMgr SSyncIndexMgr; typedef struct SRaftCfg SRaftCfg; typedef struct SSyncRespMgr SSyncRespMgr; +typedef struct SSyncSnapshotSender SSyncSnapshotSender; +typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncNode { // init by SSyncInfo @@ -148,9 +150,11 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - bool restoreFinish; // sem_t restoreSem; - SSnapshot* pSnapshot; + bool restoreFinish; + SSnapshot* pSnapshot; + SSyncSnapshotSender* pSender; + SSyncSnapshotReceiver* pReceiver; } SSyncNode; diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 7db62e14d597608f04fd313e597251ec2503f933..df5cd3f36c4138e608e70bd22972d54baff48a50 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -32,20 +32,21 @@ typedef struct SSyncLogStoreData { SWal* pWal; } SSyncLogStoreData; -SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); -void logStoreDestory(SSyncLogStore* pLogStore); -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); -SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); -int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); -SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); -SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); -int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); -SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); -SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); -cJSON* logStore2Json(SSyncLogStore* pLogStore); -char* logStore2Str(SSyncLogStore* pLogStore); -cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); -char* logStoreSimple2Str(SSyncLogStore* pLogStore); +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); +void logStoreDestory(SSyncLogStore* pLogStore); +cJSON* logStore2Json(SSyncLogStore* pLogStore); +char* logStore2Str(SSyncLogStore* pLogStore); +cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); +char* logStoreSimple2Str(SSyncLogStore* pLogStore); + +// SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); +// SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); +// SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); +// SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); +// int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); +// int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); +// int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); +// SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); // for debug void logStorePrint(SSyncLogStore* pLogStore); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index fd2119ce659b553124aa9a310c3790b29363628c..b3174a4b36855b417ababbb0c4614867433a2e83 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -23,11 +23,39 @@ extern "C" { #include #include #include +#include "cJSON.h" #include "syncInt.h" #include "taosdef.h" -int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); -int32_t restoreSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); +typedef struct SSyncSnapshotSender { + bool isStart; + int32_t progressIndex; + void * pCurrentBlock; + int32_t len; + SSnapshot *pSnapshot; + SSyncNode *pSyncNode; +} SSyncSnapshotSender; + +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode); +void snapshotSenderDestroy(SSyncSnapshotSender *pSender); +int32_t snapshotSend(SSyncSnapshotSender *pSender); +cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); + +typedef struct SSyncSnapshotReceiver { + bool isStart; + int32_t progressIndex; + void * pCurrentBlock; + int32_t len; + SSnapshot *pSnapshot; + SSyncNode *pSyncNode; +} SSyncSnapshotReceiver; + +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode); +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); +int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); +cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 008bc00dbc0590a98c6b5a77c0cb6977bbdcba8d..46be7597c7f82afefbd83f6f6258c4ba811ae046 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -107,7 +107,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncTerm localPreLogTerm = 0; if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); + SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex); assert(pEntry != NULL); localPreLogTerm = pEntry->term; syncEntryDestory(pEntry); @@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { bool conflict = false; SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); + SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex); assert(pExtraEntry != NULL); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); @@ -197,7 +197,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // notice! reverse roll back! for (SyncIndex index = delEnd; index >= delBegin; --index) { if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); + SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { @@ -365,7 +365,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } SReConfigCbMeta cbMeta = {0}; - bool isDrop; + bool isDrop; // I am in newConfig if (hit) { @@ -388,7 +388,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } // always call FpReConfigCb - if (ths->pFsm->FpReConfigCb != NULL) { + if (ths->pFsm->FpReConfigCb != NULL) { cbMeta.code = 0; cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 07a9397a580332f427ab3b206359de3ec0accf40..58fa8c2e8f1e0b4d89f003653efef4f0a3dd4b4b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -16,6 +16,15 @@ #include "syncRaftLog.h" #include "wal.h" +static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); +static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); +static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); +static SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); +static int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); +static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); +static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); +static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); + SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); assert(pLogStore != NULL); @@ -78,7 +87,9 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); - int32_t code = walReadWithHandle(pWalHandle, index); + ASSERT(pWalHandle != NULL); + + int32_t code = walReadWithHandle(pWalHandle, index); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 2fdb8a0e177f0f985c40b136ea29ce9f968c0fad..d17e64d936737ba7ea0dc5f33db407cfdf4bf205 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -75,7 +75,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); SyncAppendEntries* pMsg = NULL; - SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); if (pEntry != NULL) { pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); assert(pMsg != NULL); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 42b2bd993b515789934268f4400fece4f040f7c5..ccb0e6071b82e43bd23a9334e294a421a336e57b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -15,6 +15,22 @@ #include "syncSnapshot.h" -int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode) { return NULL; } -int32_t restoreSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } \ No newline at end of file +void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {} + +int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; } + +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { return NULL; } + +char *snapshotSender2Str(SSyncSnapshotSender *pSender) { return NULL; } + +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; } + +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {} + +int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } + +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; } + +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 7efc3f50c0154c5c971808c7bd1e7d4b6d8f84c5..1755b7a8fd967fa5db22b4fdc523cc3f771d3c4b 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -78,7 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", + cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } SSyncFSM* createFsm() {