From bbafebfbeef323b9308e685baee4eaa900d4276e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Dec 2020 11:52:43 +0800 Subject: [PATCH] TD-1926 --- src/sync/src/syncRetrieve.c | 72 ++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 44d2680b1a..36b197dd46 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -97,6 +97,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { while (1) { // retrieve file info fileInfo.name[0] = 0; + fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); // fileInfo.size = htonl(size); @@ -105,14 +106,14 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } // if no file anymore, break if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { - code = TSDB_CODE_SUCCESS; + code = 0; sDebug("%s, no more files to sync", pPeer->id); break; } @@ -120,7 +121,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // wait for the ack from peer ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -141,7 +142,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file to peer int32_t sfd = open(name, O_RDONLY); if (sfd < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -149,7 +150,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); close(sfd); if (ret < 0) { - code = TAOS_SYSTEM_ERROR(errno); + code = -1; sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -165,7 +166,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { } if (code != TSDB_CODE_SUCCESS) { - sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code)); + sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code); } return code; @@ -180,7 +181,7 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { } if (ret == 0) { - sDebug("sfd:%d, read to end of the wal, ret:%d", sfd, ret); + sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret); return 0; } @@ -254,7 +255,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi if (pHead->version >= fversion && fversion > 0) { code = 0; - sDebug("%s, retrieve wal finished, fver:%" PRIu64, pPeer->id, fversion); + sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion); break; } } @@ -294,13 +295,14 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) if (fversion == 0) { pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt fversion = nodeVersion; // must read data to fversion - sDebug("%s, fversion is 0 and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion); } } // if all data up to fversion is read out, it is over if (pPeer->sversion >= fversion && fversion > 0) { - sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes); + sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes, + pPeer->sversion); return 0; } @@ -324,7 +326,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; - struct stat fstat; int32_t code = -1; int64_t index = 0; @@ -332,9 +333,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { // retrieve wal info wname[0] = 0; code = (*pNode->getWalInfo)(pNode->vgId, wname, &index); - if (code < 0) break; // error + if (code < 0) { + sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code); + break; + } + if (wname[0] == 0) { // no wal file - sDebug("%s, no wal file", pPeer->id); + code = 0; + sDebug("%s, no wal file anymore", pPeer->id); break; } @@ -346,20 +352,35 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { // get the full path to wal file snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname); - // send wal file, - // inotify is not required, old wal file won't be modified, even remove is ok - if (stat(fname, &fstat) < 0) break; - size = fstat.st_size; + // send wal file, old wal file won't be modified, even remove is ok + struct stat fstat; + if (stat(fname, &fstat) < 0) { + code = -1; + sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } + size = fstat.st_size; sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); + int32_t sfd = open(fname, O_RDONLY); - if (sfd < 0) break; + if (sfd < 0) { + code = -1; + sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } code = taosSendFile(pPeer->syncFd, sfd, NULL, size); close(sfd); - if (code < 0) break; + if (code < 0) { + sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + break; + } - if (syncAreFilesModified(pNode, pPeer)) break; + if (syncAreFilesModified(pNode, pPeer)) { + code = -1; + break; + } } if (code == 0) { @@ -368,9 +389,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { SWalHead walHead; memset(&walHead, 0, sizeof(walHead)); - code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); + taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); } else { - sError("%s, failed to send wal since %s", pPeer->id, strerror(errno)); + sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); } return code; @@ -410,7 +431,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { pPeer->sversion = 0; pPeer->sstatus = TAOS_SYNC_STATUS_FILE; sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - if (syncRetrieveFile(pPeer) < 0) { + if (syncRetrieveFile(pPeer) != 0) { sError("%s, failed to retrieve files", pPeer->id); return -1; } @@ -419,8 +440,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { if (pPeer->sversion == 0) pPeer->sversion = 1; sInfo("%s, start to retrieve wals", pPeer->id); - if (syncRetrieveWal(pPeer) < 0) { - sError("%s, failed to retrieve wals", pPeer->id); + int32_t code = syncRetrieveWal(pPeer); + if (code != 0) { + sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code); return -1; } -- GitLab