From 456ef978cc7e5cf0a4dc2f2fc6f822886e90805f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Dec 2020 23:28:12 +0800 Subject: [PATCH] TD-2428 --- src/sync/inc/syncMsg.h | 9 +++++++-- src/sync/src/syncMsg.c | 17 +++++++++++++++++ src/sync/src/syncRestore.c | 14 ++++++++++---- src/sync/src/syncRetrieve.c | 12 ++++++++++-- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h index 9a7ff04de1..73f4223c88 100644 --- a/src/sync/inc/syncMsg.h +++ b/src/sync/inc/syncMsg.h @@ -35,7 +35,9 @@ typedef enum { TAOS_SMSG_STATUS_RSP = 10, TAOS_SMSG_SETUP = 11, TAOS_SMSG_SETUP_RSP = 12, - TAOS_SMSG_END = 13, + TAOS_SMSG_SYNC_FILE = 13, + TAOS_SMSG_SYNC_FILE_RSP = 14, + TAOS_SMSG_END = 15, } ESyncMsgType; typedef enum { @@ -116,7 +118,7 @@ typedef struct { #pragma pack(pop) -#define SYNC_PROTOCOL_VERSION 0 +#define SYNC_PROTOCOL_VERSION 1 #define SYNC_SIGNATURE ((uint16_t)(0xCDEF)) extern char *statusType[]; @@ -131,6 +133,9 @@ void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); +void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); +void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId); + #ifdef __cplusplus } #endif diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c index 7ca96e86ba..abb2ac896f 100644 --- a/src/sync/src/syncMsg.c +++ b/src/sync/src/syncMsg.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "tchecksum.h" #include "syncInt.h" @@ -92,3 +93,19 @@ void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) { pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead); syncBuildHead(&pMsg->head); } + +void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) { + memset(pMsg, 0, sizeof(SFileAck)); + pMsg->head.type = TAOS_SMSG_SYNC_FILE_RSP; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SFileAck) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); +} + +void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) { + memset(pMsg, 0, sizeof(SFileInfo)); + pMsg->head.type = TAOS_SMSG_SYNC_FILE; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); +} \ No newline at end of file diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 850a9b78b7..088215ecc7 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -69,7 +69,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { minfo.index = -1; int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo)); if (ret != sizeof(SFileInfo) || minfo.index == -1) { - sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); + sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno)); + break; + } + + ret = syncCheckHead((SSyncHead*)(&minfo)); + if (ret != 0) { + sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret)); break; } @@ -94,12 +100,12 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { &sinfo.fversion); // if file not there or magic is not the same, file shall be synced - memset(&fileAck, 0, sizeof(fileAck)); + syncBuildFileAck(&fileAck, pNode->vgId); fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; // send file ack - ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); - if (ret != sizeof(fileAck)) { + ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); + if (ret != sizeof(SFileAck)) { sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); break; } diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 204f4f9036..a1022c6fa0 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -99,6 +99,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { while (1) { // retrieve file info + syncBuildFileInfo(&fileInfo, pNode->vgId); fileInfo.name[0] = 0; fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, @@ -106,8 +107,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); // send the file info - int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); - if (ret != sizeof(fileInfo)) { + int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo)); + if (ret != sizeof(SFileInfo)) { code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; @@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { break; } + ret = syncCheckHead((SSyncHead*)(&fileAck)); + if (ret != 0) { + code = -1; + sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret)); + break; + } + // set the peer sync version pPeer->sversion = fileInfo.fversion; -- GitLab