From dbdd70aca16a1415efab789d3d44c1af7b1e5d7c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 Nov 2020 11:15:08 +0000 Subject: [PATCH] TD-2196 --- src/sync/src/syncRestore.c | 31 ++++++++++++++++++++++++------- src/sync/src/syncRetrieve.c | 34 ++++++++++++++++++++++++++-------- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 466c39060a..69da1d71fa 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -65,7 +65,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { while (1) { // read file info int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); - if (ret < 0) break; + if (ret < 0) { + sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); + break; + } // if no more file from master, break; if (minfo.name[0] == 0 || minfo.magic == 0) { @@ -92,8 +95,11 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; // send file ack - ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); - if (ret < 0) break; + ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); + if (ret < 0) { + sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); + break; + } // if sync is not required, continue if (fileAck.sync == 0) { @@ -108,14 +114,17 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); if (dfd < 0) { - sError("%s, failed to open file:%s", pPeer->id, name); + sError("%s, failed to open file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno)); break; } ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size); fsync(dfd); close(dfd); - if (ret < 0) break; + if (ret < 0) { + sError("%s, failed to copy file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno)); + break; + } fileChanged = true; sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); @@ -125,6 +134,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { // data file is changed, code shall be set to 1 *fversion = minfo.fversion; code = 1; + sDebug("%s, file changed while restore file", pPeer->id); } if (code < 0) { @@ -146,15 +156,22 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { while (1) { ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); - if (ret < 0) break; + if (ret < 0) { + sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno)); + break; + } if (pHead->len == 0) { + sDebug("%s, wal is synced over", pPeer->id); code = 0; break; } // wal sync over ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); - if (ret < 0) break; + if (ret < 0) { + sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno)); + break; + } sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 04d4ce32cb..58d09d080e 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -114,7 +114,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); - if (ret < 0) break; + if (ret < 0) { + sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); + break; + } // if no file anymore, break if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { @@ -124,8 +127,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { } // wait for the ack from peer - ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); - if (ret < 0) break; + ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); + if (ret < 0) { + sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); + break; + } // set the peer sync version pPeer->sversion = fileInfo.fversion; @@ -134,7 +140,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); // add the file into watch list - if (syncAddIntoWatchList(pPeer, name) < 0) break; + if (syncAddIntoWatchList(pPeer, name) < 0) { + sError("%s, failed to watch file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); + break; + } // if sync is not required, continue if (fileAck.sync == 0) { @@ -145,21 +154,30 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file to peer int32_t sfd = open(name, O_RDONLY); - if (sfd < 0) break; + if (sfd < 0) { + sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); + break; + } ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); close(sfd); - if (ret < 0) break; + if (ret < 0) { + sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); + break; + } sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); fileInfo.index++; // check if processed files are modified - if (syncAreFilesModified(pPeer) != 0) break; + if (syncAreFilesModified(pPeer) != 0) { + sInfo("%s, file:%s are modified while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); + break; + } } if (code < 0) { - sError("%s, failed to retrieve file since %s", pPeer->id, strerror(errno)); + sError("%s, failed to retrieve file", pPeer->id); } return code; -- GitLab