diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index 84c0e5b841694dd03eb310629c7f46d770f3732e..97e0cf3ffd3a3bcfe7ac928f491b9a659e4ba233 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -13,13 +13,16 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" #include "tsdbint.h" // Sync handle typedef struct { STsdbRepo *pRepo; SRtn rtn; - int socketFd; + int32_t socketFd; void * pBuf; SMFile * pmf; SMFile mf; @@ -29,22 +32,22 @@ typedef struct { #define SYNC_BUFFER(sh) ((sh)->pBuf) -static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd); -static void tsdbDestroySyncH(SSyncH *pSyncH); -static int tsdbSyncSendMeta(SSyncH *pSynch); -static int tsdbSyncRecvMeta(SSyncH *pSynch); -static int tsdbSendMetaInfo(SSyncH *pSynch); -static int tsdbRecvMetaInfo(SSyncH *pSynch); -static int tsdbSendDecision(SSyncH *pSynch, bool toSend); -static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend); -static int tsdbSyncSendDFileSetArray(SSyncH *pSynch); -static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch); -static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); -static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet); -static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet); -static int tsdbRecvDFileSetInfo(SSyncH *pSynch); - -int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) { +static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int32_t socketFd); +static void tsdbDestroySyncH(SSyncH *pSyncH); +static int32_t tsdbSyncSendMeta(SSyncH *pSynch); +static int32_t tsdbSyncRecvMeta(SSyncH *pSynch); +static int32_t tsdbSendMetaInfo(SSyncH *pSynch); +static int32_t tsdbRecvMetaInfo(SSyncH *pSynch); +static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend); +static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend); +static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch); +static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch); +static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); +static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet); +static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet); +static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch); + +int32_t tsdbSyncSend(STsdbRepo *pRepo, int32_t socketFd) { SSyncH synch = {0}; tsdbInitSyncH(&synch, pRepo, socketFd); @@ -52,12 +55,12 @@ int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) { sem_wait(&(pRepo->readyToCommit)); if (tsdbSyncSendMeta(&synch) < 0) { - tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to send metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } if (tsdbSyncSendDFileSetArray(&synch) < 0) { - tsdbError("vgId:%d failed to send data file set array since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to send filesets since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } @@ -72,19 +75,19 @@ _err: return -1; } -int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd) { +int32_t tsdbSyncRecv(STsdbRepo *pRepo, int32_t socketFd) { SSyncH synch; tsdbInitSyncH(&synch, pRepo, socketFd); tsdbStartFSTxn(pRepo, 0, 0); if (tsdbSyncRecvMeta(&synch) < 0) { - tsdbError("vgId:%d failed to sync recv meta file from remote since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } if (tsdbSyncRecvDFileSetArray(&synch) < 0) { - tsdbError("vgId:%d failed to sync recv data file set from remote since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to recv filesets since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } @@ -100,7 +103,7 @@ _err: return -1; } -static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd) { +static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int32_t socketFd) { pSyncH->pRepo = pRepo; pSyncH->socketFd = socketFd; tsdbGetRtnSnap(pRepo, &(pSyncH->rtn)); @@ -108,90 +111,109 @@ static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd) { static void tsdbDestroySyncH(SSyncH *pSyncH) { taosTZfree(pSyncH->pBuf); } -// ============ SYNC META API -static int tsdbSyncSendMeta(SSyncH *pSynch) { +static int32_t tsdbSyncSendMeta(SSyncH *pSynch) { STsdbRepo *pRepo = pSynch->pRepo; bool toSendMeta = false; SMFile mf; // Send meta info to remote if (tsdbSendMetaInfo(pSynch) < 0) { - tsdbError("vgId:%d failed to send meta file info since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to send metainfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } if (pRepo->fs->cstatus->pmf == NULL) { // No meta file, not need to wait to retrieve meta file + tsdbInfo("vgId:%d, metafile not exist, no need to send", REPO_ID(pRepo)); return 0; } if (tsdbRecvDecision(pSynch, &toSendMeta) < 0) { - tsdbError("vgId:%d failed to recv send file decision since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to recv decision while send meta since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } if (toSendMeta) { + tsdbInfo("vgId:%d, metafile will be sent", REPO_ID(pRepo)); + tsdbInitMFileEx(&mf, pRepo->fs->cstatus->pmf); if (tsdbOpenMFile(&mf, O_RDONLY) < 0) { - tsdbError("vgId:%d failed to open meta file while sync send meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to open file while send metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } - if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&mf), 0, mf.info.size) < mf.info.size) { - tsdbError("vgId:%d failed to copy meta file to remote since %s", REPO_ID(pRepo), tstrerror(terrno)); + int32_t writeLen = mf.info.size; + int32_t ret = taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&mf), 0, writeLen); + if (ret != writeLen) { + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to send metafile since %s, ret:%d writeLen:%d", REPO_ID(pRepo), tstrerror(terrno), ret, + writeLen); tsdbCloseMFile(&mf); return -1; } tsdbCloseMFile(&mf); + tsdbInfo("vgId:%d, metafile is sent, size:%d", REPO_ID(pRepo), writeLen); + } else { + tsdbInfo("vgId:%d, metafile is same, no need to send", REPO_ID(pRepo)); } return 0; } -static int tsdbSyncRecvMeta(SSyncH *pSynch) { +static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) { STsdbRepo *pRepo = pSynch->pRepo; SMFile * pLMFile = pRepo->fs->cstatus->pmf; // Recv meta info from remote if (tsdbRecvMetaInfo(pSynch) < 0) { - tsdbError("vgId:%d failed to recv meta info from remote since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } // No meta file, do nothing (rm local meta file) if (pSynch->pmf == NULL) { + tsdbInfo("vgId:%d, metafile not exist in remote, no need to recv", REPO_ID(pRepo)); return 0; } if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) { // Local has no meta file or has a different meta file, need to copy from remote if (tsdbSendDecision(pSynch, true) < 0) { - // TODO + tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } + tsdbInfo("vgId:%d, metafile will be received", REPO_ID(pRepo)); + // Recv from remote SMFile mf; SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}; tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); if (tsdbCreateMFile(&mf, false) < 0) { - // TODO + tsdbError("vgId:%d, failed to create file while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } - if (taosCopyFds(pSynch->socketFd, TSDB_FILE_FD(&mf), pSynch->pmf->info.size) < pSynch->pmf->info.size) { - // TODO + int32_t readLen = pSynch->pmf->info.size; + int32_t ret = taosCopyFds(pSynch->socketFd, TSDB_FILE_FD(&mf), readLen); + if (ret != readLen) { + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to recv metafile since %s, ret:%d readLen:%d", REPO_ID(pRepo), tstrerror(terrno), ret, + readLen); tsdbCloseMFile(&mf); tsdbRemoveMFile(&mf); return -1; } + tsdbInfo("vgId:%d, metafile is received, size:%d", REPO_ID(pRepo), readLen); + tsdbCloseMFile(&mf); tsdbUpdateMFile(REPO_FS(pRepo), &mf); } else { + tsdbInfo("vgId:%d, metafile is same, no need to recv", REPO_ID(pRepo)); if (tsdbSendDecision(pSynch, false) < 0) { - // TODO + tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } } @@ -199,7 +221,7 @@ static int tsdbSyncRecvMeta(SSyncH *pSynch) { return 0; } -static int tsdbSendMetaInfo(SSyncH *pSynch) { +static int32_t tsdbSendMetaInfo(SSyncH *pSynch) { STsdbRepo *pRepo = pSynch->pRepo; uint32_t tlen = 0; SMFile * pMFile = pRepo->fs->cstatus->pmf; @@ -209,6 +231,7 @@ static int tsdbSendMetaInfo(SSyncH *pSynch) { } if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) { + tsdbError("vgId:%d, failed to makeroom while send metainfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -220,45 +243,55 @@ static int tsdbSendMetaInfo(SSyncH *pSynch) { taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen); } - if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(uint32_t)) < tlen + sizeof(uint32_t)) { - tsdbError("vgId:%d failed to send sync meta file info to remote since %s", REPO_ID(pRepo), strerror(errno)); + int32_t writeLen = tlen + sizeof(uint32_t); + int32_t ret = taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), writeLen); + if (ret != writeLen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to send metainfo since %s, ret:%d writeLen:%d", REPO_ID(pRepo), tstrerror(terrno), ret, + writeLen); return -1; } return 0; } -static int tsdbRecvMetaInfo(SSyncH *pSynch) { - uint32_t tlen; - char buf[64] = "\0"; +static int32_t tsdbRecvMetaInfo(SSyncH *pSynch) { + STsdbRepo *pRepo = pSynch->pRepo; + uint32_t tlen = 0; + char buf[64] = {0}; - if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) { - // TODO + int32_t readLen = sizeof(uint32_t); + int32_t ret = taosReadMsg(pSynch->socketFd, buf, readLen); + if (ret != readLen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to recv metainfo len, ret:%d readLen:%d", REPO_ID(pRepo), ret, readLen); return -1; } taosDecodeFixedU32(buf, &tlen); + tsdbInfo("vgId:%d, metainfo len:%d is received", REPO_ID(pRepo), tlen); if (tlen == 0) { pSynch->pmf = NULL; return 0; } if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) { + tsdbError("vgId:%d, failed to makeroom while recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } - if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) { - // TODO + ret = taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen); + if (ret != tlen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to recv metainfo, ret:%d readLen:%d", REPO_ID(pRepo), ret, tlen); return -1; } + tsdbInfo("vgId:%d, metainfo is received", REPO_ID(pRepo)); if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) { - // TODO terrno = TSDB_CODE_TDB_MESSED_MSG; + tsdbError("vgId:%d, failed to checksum while recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -268,34 +301,38 @@ static int tsdbRecvMetaInfo(SSyncH *pSynch) { return 0; } -static int tsdbSendDecision(SSyncH *pSynch, bool toSend) { - uint8_t decision = toSend; +static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend) { + STsdbRepo *pRepo = pSynch->pRepo; + uint8_t decision = toSend; - if (taosWriteMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) { - // TODO + int32_t writeLen = sizeof(uint8_t); + int32_t ret = taosWriteMsg(pSynch->socketFd, (void *)(&decision), writeLen); + if (ret != writeLen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to send decison, ret:%d writeLen:%d", REPO_ID(pRepo), ret, writeLen); return -1; } return 0; } -static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend) { - uint8_t decision; +static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend) { + STsdbRepo *pRepo = pSynch->pRepo; + uint8_t decision = 0; - if (taosReadMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) { - // TODO + int32_t readLen = sizeof(uint8_t); + int32_t ret = taosReadMsg(pSynch->socketFd, (void *)(&decision), readLen); + if (ret != readLen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to recv decison, ret:%d readLen:%d", REPO_ID(pRepo), ret, readLen); return -1; } *toSend = decision; - return 0; } -// ========== SYNC DATA FILE SET ARRAY API -static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) { +static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch) { STsdbRepo *pRepo = pSynch->pRepo; STsdbFS * pfs = REPO_FS(pRepo); SFSIter fsiter; @@ -306,12 +343,13 @@ static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) { do { pSet = tsdbFSIterNext(&fsiter); if (tsdbSyncSendDFileSet(pSynch, pSet) < 0) { - // TODO + tsdbError("vgId:%d, failed to send fileset:%d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); return -1; } // No more file set to send, jut break if (pSet == NULL) { + tsdbInfo("vgId:%d, no filesets any more", REPO_ID(pRepo)); break; } } while (true); @@ -319,7 +357,7 @@ static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) { return 0; } -static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { +static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { STsdbRepo *pRepo = pSynch->pRepo; STsdbFS * pfs = REPO_FS(pRepo); SFSIter fsiter; @@ -329,34 +367,44 @@ static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { pLSet = tsdbFSIterNext(&fsiter); if (tsdbRecvDFileSetInfo(pSynch) < 0) { - // TODO + tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } while (true) { - if (pLSet == NULL && pSynch->pdf == NULL) break; + if (pLSet == NULL && pSynch->pdf == NULL) { + tsdbInfo("vgId:%d, all filesets is disposed", REPO_ID(pRepo)); + break; + } else { + tsdbInfo("vgId:%d, fileset local:%d remote:%d, will be disposed", REPO_ID(pRepo), pLSet != NULL ? pLSet->fid : -1, + pSynch->pdf != NULL ? pSynch->pdf->fid : -1); + } if (pLSet && (pSynch->pdf == NULL || pLSet->fid < pSynch->pdf->fid)) { // remote not has pLSet->fid set, just remove local (do nothing to remote the fset) + tsdbInfo("vgId:%d, fileset:%d smaller than remote:%d, remove it", REPO_ID(pRepo), pLSet->fid, pSynch->pdf->fid); pLSet = tsdbFSIterNext(&fsiter); } else { if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) { // Just keep local files and notify remote not to send + tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid); + if (tsdbUpdateDFileSet(pfs, pLSet) < 0) { - // TODO + tsdbError("vgId:%d, failed to update fileset since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } if (tsdbSendDecision(pSynch, false) < 0) { - // TODO + tsdbError("vgId:%d, filed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } } else { // Need to copy from remote + tsdbInfo("vgId:%d, fileset:%d will be received", REPO_ID(pRepo), pSynch->pdf->fid); // Notify remote to send there file here if (tsdbSendDecision(pSynch, true) < 0) { - // TODO + tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -367,6 +415,7 @@ static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { tfsAllocDisk(tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn)), &(did.level), &(did.id)); if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + tsdbError("vgId:%d, failed allc disk since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -374,34 +423,45 @@ static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { // Create new FSET if (tsdbCreateDFileSet(&fset, false) < 0) { - // TODO + tsdbError("vgId:%d, failed to create fileset since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file - if (taosCopyFds(pSynch->socketFd, pDFile->fd, pRDFile->info.size) < pRDFile->info.size) { - // TODO + + tsdbInfo("vgId:%d, file:%s will be received, osize:%" PRIu64 " rsize:%" PRIu64, REPO_ID(pRepo), + pDFile->f.aname, pDFile->info.size, pRDFile->info.size); + + int32_t writeLen = pRDFile->info.size; + int32_t ret = taosCopyFds(pSynch->socketFd, pDFile->fd, writeLen); + if (ret != writeLen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to recv file:%s since %s, ret:%d writeLen:%d", REPO_ID(pRepo), pDFile->f.aname, + tstrerror(terrno), ret, writeLen); tsdbCloseDFileSet(&fset); tsdbRemoveDFileSet(&fset); return -1; } + // Update new file info pDFile->info = pRDFile->info; + tsdbInfo("vgId:%d, file:%s is received, size:%d", REPO_ID(pRepo), pDFile->f.aname, writeLen); } tsdbCloseDFileSet(&fset); if (tsdbUpdateDFileSet(pfs, &fset) < 0) { - // TODO + tsdbInfo("vgId:%d, fileset:%d failed to update since %s", REPO_ID(pRepo), fset.fid, tstrerror(terrno)); return -1; } + + tsdbInfo("vgId:%d, fileset:%d is received", REPO_ID(pRepo), pSynch->pdf->fid); } // Move forward if (tsdbRecvDFileSetInfo(pSynch) < 0) { - // TODO + tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -455,11 +515,12 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { return true; } -static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { +static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { + STsdbRepo *pRepo = pSynch->pRepo; bool toSend = false; if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) { - // TODO + tsdbError("vgId:%d, failed to send fileset:%d info since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); return -1; } @@ -469,37 +530,56 @@ static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { } if (tsdbRecvDecision(pSynch, &toSend) < 0) { + tsdbError("vgId:%d, failed to recv decision while send fileset:%d since %s", REPO_ID(pRepo), pSet->fid, + tstrerror(terrno)); return -1; } if (toSend) { + tsdbInfo("vgId:%d, fileset:%d will be sent", REPO_ID(pRepo), pSet->fid); + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype); - + if (tsdbOpenDFile(&df, O_RDONLY) < 0) { + tsdbError("vgId:%d, failed to file:%s since %s", REPO_ID(pRepo), df.f.aname, tstrerror(terrno)); return -1; } - if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&df), 0, df.info.size) < df.info.size) { + int32_t writeLen = df.info.size; + tsdbInfo("vgId:%d, file:%s will be sent, size:%d", REPO_ID(pRepo), df.f.aname, writeLen); + + int32_t ret = taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&df), 0, writeLen); + if (ret != writeLen) { + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to send file:%s since %s, ret:%d writeLen:%d", REPO_ID(pRepo), df.f.aname, + tstrerror(terrno), ret, writeLen); tsdbCloseDFile(&df); return -1; } + tsdbInfo("vgId:%d, file:%s is sent", REPO_ID(pRepo), df.f.aname); tsdbCloseDFile(&df); } + + tsdbInfo("vgId:%d, fileset:%d is sent", REPO_ID(pRepo), pSet->fid); + } else { + tsdbInfo("vgId:%d, fileset:%d is same, no need to send", REPO_ID(pRepo), pSet->fid); } return 0; } -static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) { - uint32_t tlen = 0; +static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) { + STsdbRepo *pRepo = pSynch->pRepo; + uint32_t tlen = 0; if (pSet) { tlen = tsdbEncodeDFileSetEx(NULL, pSet) + sizeof(TSCKSUM); } if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen + sizeof(tlen)) < 0) { + tsdbError("vgId:%d, failed to makeroom while send fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -511,42 +591,52 @@ static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) { taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen); } - if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(tlen)) < tlen + sizeof(tlen)) { - // TODO + int32_t writeLen = tlen + sizeof(uint32_t); + int32_t ret = taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), writeLen); + if (ret != writeLen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to send fileinfo, ret:%d writeLen:%d", REPO_ID(pRepo), ret, writeLen); return -1; } return 0; } -static int tsdbRecvDFileSetInfo(SSyncH *pSynch) { - uint32_t tlen; - char buf[64] = "\0"; +static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) { + STsdbRepo *pRepo = pSynch->pRepo; + uint32_t tlen; + char buf[64] = {0}; - if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) { + int32_t readLen = sizeof(uint32_t); + int32_t ret = taosReadMsg(pSynch->socketFd, buf, readLen); + if (ret != readLen) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } taosDecodeFixedU32(buf, &tlen); + tsdbInfo("vgId:%d, fileinfo len:%d is received", REPO_ID(pRepo), tlen); if (tlen == 0) { pSynch->pdf = NULL; return 0; } if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) { + tsdbError("vgId:%d, failed to makeroom while recv fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } - if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) { + ret = taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen); + if (ret != tlen) { terrno = TAOS_SYSTEM_ERROR(errno); + tsdbError("vgId:%d, failed to recv fileinfo, ret:%d readLen:%d", REPO_ID(pRepo), ret, tlen); return -1; } if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) { terrno = TSDB_CODE_TDB_MESSED_MSG; + tsdbError("vgId:%d, failed to checksum while recv fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; }