提交 8da79502 编写于 作者: M Minghao Li

refactor(sync): close receiver when become leader

上级 dd212b5d
......@@ -28,10 +28,10 @@ extern "C" {
#include "syncMessage.h"
#include "taosdef.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -1
#define SYNC_SNAPSHOT_SEQ_INVALID -1
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_RETRY_MS 5000
......@@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void * pReader;
void * pCurrentBlock;
void *pReader;
void *pCurrentBlock;
int32_t blockLen;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SSyncCfg lastConfig;
int64_t sendingMS;
SSyncNode * pSyncNode;
SSyncNode *pSyncNode;
int32_t replicaIndex;
SyncTerm term;
SyncTerm privateTerm;
......@@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
//---------------------------------------------------
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void * pWriter;
void *pWriter;
SyncTerm term;
SyncTerm privateTerm;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SRaftId fromId;
SSyncNode * pSyncNode;
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver;
......@@ -86,10 +86,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver)
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//---------------------------------------------------
// on message
......
......@@ -2181,6 +2181,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
(pMySender->privateTerm) += 100;
}
// close receiver
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
}
// stop elect timer
syncNodeStopElectTimer(pSyncNode);
......
......@@ -24,7 +24,6 @@
//----------------------------------
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
......@@ -374,14 +373,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256;
char * s = taosMemoryMalloc(len);
char *s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64];
......@@ -480,7 +479,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
}
// force stop
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
......@@ -653,7 +652,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON * pTmp = pFromId;
cJSON *pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
......@@ -686,14 +685,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256;
char * s = taosMemoryMalloc(len);
char *s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId;
char host[128];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册