diff --git a/src/sync/inc/syncMsg.h b/src/sync/inc/syncMsg.h index 9a7ff04de1ec18abc5017d18c7f09f113aba8c41..73f4223c882ff9ce544984b37843070cf579c44a 100644 --- a/src/sync/inc/syncMsg.h +++ b/src/sync/inc/syncMsg.h @@ -35,7 +35,9 @@ typedef enum { TAOS_SMSG_STATUS_RSP = 10, TAOS_SMSG_SETUP = 11, TAOS_SMSG_SETUP_RSP = 12, - TAOS_SMSG_END = 13, + TAOS_SMSG_SYNC_FILE = 13, + TAOS_SMSG_SYNC_FILE_RSP = 14, + TAOS_SMSG_END = 15, } ESyncMsgType; typedef enum { @@ -116,7 +118,7 @@ typedef struct { #pragma pack(pop) -#define SYNC_PROTOCOL_VERSION 0 +#define SYNC_PROTOCOL_VERSION 1 #define SYNC_SIGNATURE ((uint16_t)(0xCDEF)) extern char *statusType[]; @@ -131,6 +133,9 @@ void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); +void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); +void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId); + #ifdef __cplusplus } #endif diff --git a/src/sync/src/syncMsg.c b/src/sync/src/syncMsg.c index 7ca96e86baabab35e0c0b30a950780962a26ae72..abb2ac896f55f9735a70e59890d40a94cbca3ece 100644 --- a/src/sync/src/syncMsg.c +++ b/src/sync/src/syncMsg.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "tchecksum.h" #include "syncInt.h" @@ -92,3 +93,19 @@ void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) { pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead); syncBuildHead(&pMsg->head); } + +void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) { + memset(pMsg, 0, sizeof(SFileAck)); + pMsg->head.type = TAOS_SMSG_SYNC_FILE_RSP; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SFileAck) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); +} + +void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) { + memset(pMsg, 0, sizeof(SFileInfo)); + pMsg->head.type = TAOS_SMSG_SYNC_FILE; + pMsg->head.vgId = vgId; + pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead); + syncBuildHead(&pMsg->head); +} \ No newline at end of file diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 850a9b78b7f5148737450926d976d0dccf10e3b6..088215ecc76164f7315aef7f6c570c0d55f26d9c 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -69,7 +69,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { minfo.index = -1; int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo)); if (ret != sizeof(SFileInfo) || minfo.index == -1) { - sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); + sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno)); + break; + } + + ret = syncCheckHead((SSyncHead*)(&minfo)); + if (ret != 0) { + sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret)); break; } @@ -94,12 +100,12 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { &sinfo.fversion); // if file not there or magic is not the same, file shall be synced - memset(&fileAck, 0, sizeof(fileAck)); + syncBuildFileAck(&fileAck, pNode->vgId); fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; // send file ack - ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); - if (ret != sizeof(fileAck)) { + ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); + if (ret != sizeof(SFileAck)) { sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); break; } diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 204f4f90367c2038e40b41d7ab206c579d31b3d6..a1022c6fa006e43d1a206e761cf39557b5ffcecd 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -99,6 +99,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { while (1) { // retrieve file info + syncBuildFileInfo(&fileInfo, pNode->vgId); fileInfo.name[0] = 0; fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, @@ -106,8 +107,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); // send the file info - int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); - if (ret != sizeof(fileInfo)) { + int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo)); + if (ret != sizeof(SFileInfo)) { code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; @@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { break; } + ret = syncCheckHead((SSyncHead*)(&fileAck)); + if (ret != 0) { + code = -1; + sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret)); + break; + } + // set the peer sync version pPeer->sversion = fileInfo.fversion;