diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 0dc67cf15077cee2558b582b33b358583b9e8aab..6fb558e45c66e13646c26b20e20bf60a7ded1d26 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -28,10 +28,10 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" -#define SYNC_SNAPSHOT_SEQ_INVALID -1 +#define SYNC_SNAPSHOT_SEQ_INVALID -1 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2 -#define SYNC_SNAPSHOT_SEQ_BEGIN 0 -#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF +#define SYNC_SNAPSHOT_SEQ_BEGIN 0 +#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_RETRY_MS 5000 @@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode * pSyncNode; + SSyncNode *pSyncNode; int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; @@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); -char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); +char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; + void *pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode * pSyncNode; + SSyncNode *pSyncNode; } SSyncSnapshotReceiver; @@ -86,10 +86,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3fe600ecbbe1da1d452677475e4e11c5ea3232dd..51098374b03531142c9c12443fa5b02efddc3aca 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2181,6 +2181,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { (pMySender->privateTerm) += 100; } + // close receiver + if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { + snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + } + // stop elect timer syncNodeStopElectTimer(pSyncNode); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 702e9f01dcb250b8c07c776865f8f274eff89f20..5489a107e76082106961a0ed107413e5ec9b4a64 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -24,7 +24,6 @@ //---------------------------------- static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); -static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); @@ -374,14 +373,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -480,7 +479,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh } // force stop -static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { +void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { // force close, abandon incomplete data if (pReceiver->pWriter != NULL) { int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, @@ -653,7 +652,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON * pTmp = pFromId; + cJSON *pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -686,14 +685,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128];