提交 fdf79307 编写于 作者: M Minghao Li

refactor(sync): add index/term in snapshot write

上级 e4152b43
......@@ -134,7 +134,7 @@ typedef struct SSyncFSM {
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
} SSyncFSM;
......
......@@ -143,7 +143,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
}
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d", isApply);
SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply);
......
......@@ -515,7 +515,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
#endif
}
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapWriterClose(pWriter, !isApply);
......
......@@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256;
char * s = taosMemoryMalloc(len);
char *s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64];
......@@ -434,8 +434,8 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver != NULL) {
// close writer
if (pReceiver->pWriter != NULL) {
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
false, &(pReceiver->snapshot));
ASSERT(ret == 0);
pReceiver->pWriter = NULL;
}
......@@ -483,8 +483,8 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&(pReceiver->snapshot));
ASSERT(ret == 0);
pReceiver->pWriter = NULL;
}
......@@ -524,8 +524,8 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend
// FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver->pWriter != NULL) {
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&(pReceiver->snapshot));
ASSERT(ret == 0);
pReceiver->pWriter = NULL;
}
......@@ -574,7 +574,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
}
// stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&(pReceiver->snapshot));
if (code != 0) {
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
ASSERT(0);
......@@ -646,7 +647,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON * pTmp = pFromId;
cJSON *pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
......@@ -679,14 +680,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256;
char * s = taosMemoryMalloc(len);
char *s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId;
char host[128];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册