diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index f00dfe6f50ea72dbb68eebc902d17b4bfb1924a4..55e0755a8913b79e474149ac13bb85226d007022 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -35,12 +35,13 @@ extern "C" { #define SYNC_SNAPSHOT_RETRY_MS 5000 +//--------------------------------------------------- typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshot snapshot; SSyncCfg lastConfig; @@ -59,33 +60,36 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); -char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); -typedef struct SSyncSnapshotReceiver { - bool start; - - int32_t ack; - void * pWriter; - SyncTerm term; - SyncTerm privateTerm; - SSnapshot snapshot; +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); +char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); - SSyncNode *pSyncNode; +//--------------------------------------------------- +typedef struct SSyncSnapshotReceiver { + bool start; + int32_t ack; + void *pWriter; + SyncTerm term; + SyncTerm privateTerm; + SSnapshot snapshot; SRaftId fromId; + SSyncNode *pSyncNode; } SSyncSnapshotReceiver; SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); -bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); +bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); + cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +//--------------------------------------------------- +// on message int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 0feaf532bb4798f39b2ea03d96cc725fb2d4a34a..39e972c4c46546407da15c7d777be810e0609805 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -21,6 +21,7 @@ #include "syncUtil.h" #include "wal.h" +//---------------------------------- static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); @@ -49,7 +50,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { - sError("snapshotSenderCreate cannot create sender"); + sError("vgId:%d cannot create snapshot sender", pSyncNode->vgId); } return pSender; @@ -57,39 +58,59 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { if (pSender != NULL) { + // free current block if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); + pSender->pCurrentBlock = NULL; } + + // close reader + if (pSender->pReader != NULL) { + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); + ASSERT(ret == 0); + pSender->pReader = NULL; + } + + // free sender taosMemoryFree(pSender); } } bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } -// begin send snapshot (current term, seq begin) +// begin send snapshot by snapshot, pReader void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) { ASSERT(!snapshotSenderIsStart(pSender)); - pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; - pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; - // init snapshot and reader ASSERT(pSender->pReader == NULL); pSender->pReader = pReader; pSender->snapshot = snapshot; + // init current block if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); } pSender->blockLen = 0; + // update term + pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; + ++(pSender->privateTerm); + + // update state + pSender->finish = false; + pSender->start = true; + pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; + pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; + + // init last config if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { int32_t code = 0; SSyncRaftEntry *pEntry = NULL; + bool getLastConfig = false; + code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex, &pEntry); - - bool getLastConfig = false; if (code == 0) { ASSERT(pEntry != NULL); @@ -112,29 +133,29 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void } } + // last config not found in wal, update to -1 if (!getLastConfig) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %ld to -1", - pSender->snapshot.lastConfigIndex); - pSender->snapshot.lastConfigIndex = -1; - - char *eventLog = snapshotSender2SimpleStr(pSender, logBuf); - syncNodeEventLog(pSender->pSyncNode, eventLog); - taosMemoryFree(eventLog); - + SyncIndex oldLastConfigIndex = pSender->snapshot.lastConfigIndex; + SyncIndex newLastConfigIndex = SYNC_INDEX_INVALID; + pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); + + // event log + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %ld to %ld", oldLastConfigIndex, + newLastConfigIndex); + char *eventLog = snapshotSender2SimpleStr(pSender, logBuf); + syncNodeEventLog(pSender->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } } else { + // no last config memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); } - pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; - pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; - ++(pSender->privateTerm); - pSender->finish = false; - pSender->start = true; - // build begin msg SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); pMsg->srcId = pSender->pSyncNode->myRaftId; @@ -151,40 +172,39 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); - - char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send"); - syncNodeEventLog(pSender->pSyncNode, eventLog); - taosMemoryFree(eventLog); - syncSnapshotSendDestroy(pMsg); + + // event log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send"); + syncNodeEventLog(pSender->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } void snapshotSenderStop(SSyncSnapshotSender *pSender) { + // close reader if (pSender->pReader != NULL) { int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); ASSERT(ret == 0); pSender->pReader = NULL; } + // free current block if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); pSender->pCurrentBlock = NULL; pSender->blockLen = 0; } + // update flag pSender->start = false; - - if (gRaftDetailLog) { - char *s = snapshotSender2Str(pSender); - sInfo("snapshotSenderStop %s", s); - taosMemoryFree(s); - } } -// when sender receiver ack, call this function to send msg from seq +// when sender receive ack, call this function to send msg from seq // seq = ack + 1, already updated int32_t snapshotSend(SSyncSnapshotSender *pSender) { - // free memory last time (seq - 1) + // free memory last time (current seq - 1) if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); pSender->pCurrentBlock = NULL; @@ -198,7 +218,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (pSender->blockLen > 0) { // has read data } else { - // read finish + // read finish, update seq to end pSender->seq = SYNC_SNAPSHOT_SEQ_END; } @@ -219,25 +239,28 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncSnapshotSendDestroy(pMsg); - if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { - char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish"); - syncNodeEventLog(pSender->pSyncNode, eventLog); - taosMemoryFree(eventLog); - - } else { - char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending"); + // event log + do { + char *eventLog = NULL; + if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { + eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender finish"); + } else { + eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender sending"); + } syncNodeEventLog(pSender->pSyncNode, eventLog); taosMemoryFree(eventLog); - } + } while (0); - syncSnapshotSendDestroy(pMsg); return 0; } // send snapshot data from cache int32_t snapshotReSend(SSyncSnapshotSender *pSender) { - if (pSender->pCurrentBlock != NULL) { + // send current block data + if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) { + // build msg SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; @@ -249,16 +272,20 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { pMsg->seq = pSender->seq; memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); + // send msg SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); - - char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend"); - syncNodeEventLog(pSender->pSyncNode, eventLog); - taosMemoryFree(eventLog); - syncSnapshotSendDestroy(pMsg); + + // event log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender resend"); + syncNodeEventLog(pSender->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } + return 0; } @@ -294,7 +321,6 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm); cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf); cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot); - snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS); cJSON_AddStringToObject(pRoot, "sendingMS", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode); @@ -314,17 +340,17 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - char host[128]; + char host[64]; uint16_t port; syncUtilU642Addr(destId.addr, host, sizeof(host), &port); @@ -355,12 +381,12 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->term = pSyncNode->pRaftStore->currentTerm; pReceiver->privateTerm = 0; pReceiver->snapshot.data = NULL; - pReceiver->snapshot.lastApplyIndex = -1; + pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastApplyTerm = 0; - pReceiver->snapshot.lastConfigIndex = -1; + pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; } else { - sInfo("snapshotReceiverCreate cannot create receiver"); + sError("vgId:%d cannot create snapshot receiver", pSyncNode->vgId); } return pReceiver; @@ -368,29 +394,55 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { if (pReceiver != NULL) { + // close writer + if (pReceiver->pWriter != NULL) { + int32_t ret = + pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); + ASSERT(ret == 0); + pReceiver->pWriter = NULL; + } + + // free receiver taosMemoryFree(pReceiver); } } bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } -// begin receive snapshot msg (current term, seq begin) +// static do start +// receive first snapshot data +// privateTerm, pBeginMsg static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { + // update state pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; pReceiver->privateTerm = privateTerm; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->fromId = pBeginMsg->srcId; + // update snapshot pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; + // write data ASSERT(pReceiver->pWriter == NULL); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); ASSERT(ret == 0); } +static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { + // force close, abandon incomplete data + if (pReceiver->pWriter != NULL) { + int32_t ret = + pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); + ASSERT(ret == 0); + pReceiver->pWriter = NULL; + } + + pReceiver->start = false; +} + // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if already start, force close, start again void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { @@ -413,12 +465,6 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); pReceiver->start = true; } - - if (gRaftDetailLog) { - char *s = snapshotReceiver2Str(pReceiver); - sInfo("snapshotReceiverStart %s", s); - taosMemoryFree(s); - } } void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { @@ -434,12 +480,6 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { if (apply) { // ++(pReceiver->privateTerm); } - - if (gRaftDetailLog) { - char *s = snapshotReceiver2Str(pReceiver); - sInfo("snapshotReceiverStop %s", s); - taosMemoryFree(s); - } } cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { @@ -461,7 +501,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON * pTmp = pFromId; + cJSON *pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -494,14 +534,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128];