From 0fcfc142a235cb42395ebb2c26f38b7ef383877d Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 24 Feb 2021 23:21:04 +0800 Subject: [PATCH] [TD-3064]: fix wal file size(>2G) overflow with int32_t --- src/sync/src/syncRetrieve.c | 42 ++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 17513258b1..be4073760d 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -170,14 +170,14 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { return sizeof(SWalHead) + pHead->len; } -static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) { +static int64_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) { int32_t sfd = open(name, O_RDONLY | O_BINARY); if (sfd < 0) { sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno)); return -1; } - int32_t code = (int32_t)taosLSeek(sfd, offset, SEEK_SET); + int64_t code = taosLSeek(sfd, offset, SEEK_SET); if (code < 0) { sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno)); close(sfd); @@ -187,7 +187,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion); SWalHead *pHead = malloc(SYNC_MAX_SIZE); - int32_t bytes = 0; + int64_t bytes = 0; while (1) { code = syncReadOneWalRecord(sfd, pHead); @@ -198,13 +198,13 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi if (code == 0) { code = bytes; - sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes); + sDebug("%s, read to the end of wal, bytes:%" PRId64, pPeer->id, bytes); break; } sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); - int32_t wsize = code; + int32_t wsize = (int32_t)code; int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); if (ret != wsize) { code = -1; @@ -228,7 +228,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi return code; } -static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { +static int64_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { SSyncNode *pNode = pPeer->pSyncNode; int32_t once = 0; // last WAL has once ever been processed int64_t offset = 0; @@ -243,9 +243,9 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) if (syncAreFilesModified(pNode, pPeer)) return -1; if (syncGetWalVersion(pNode, pPeer) < 0) return -1; - int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset); + int64_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset); if (bytes < 0) { - sDebug("%s, failed to retrieve last wal, bytes:%d", pPeer->id, bytes); + sDebug("%s, failed to retrieve last wal, bytes:%" PRId64, pPeer->id, bytes); return bytes; } @@ -263,7 +263,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) // if all data up to fversion is read out, it is over if (pPeer->sversion >= fversion && fversion > 0) { - sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes, + sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%" PRId64 " sver:%" PRIu64, pPeer->id, fversion, bytes, pPeer->sversion); return 0; } @@ -277,19 +277,19 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) // if bytes > 0, file is updated, or fversion is not reached but file still open, read again once = 1; offset += bytes; - sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id, + sDebug("%s, continue retrieve last wal, bytes:%" PRId64 " offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id, bytes, offset, pPeer->sversion, fversion); } return -1; } -static int32_t syncRetrieveWal(SSyncPeer *pPeer) { +static int64_t syncRetrieveWal(SSyncPeer *pPeer) { SSyncNode * pNode = pPeer->pSyncNode; char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; - int32_t code = -1; + int64_t code = -1; int64_t index = 0; while (1) { @@ -297,7 +297,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { wname[0] = 0; code = (*pNode->getWalInfoFp)(pNode->vgId, wname, &index); if (code < 0) { - sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code); + sError("%s, failed to get wal info since:%s, code:0x%" PRIx64, pPeer->id, strerror(errno), code); break; } @@ -309,7 +309,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { if (code == 0) { // last wal code = syncProcessLastWal(pPeer, wname, index); - sInfo("%s, last wal processed, code:%d", pPeer->id, code); + sInfo("%s, last wal processed, code:%" PRId64, pPeer->id, code); break; } @@ -320,7 +320,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { struct stat fstat; if (stat(fname, &fstat) < 0) { code = -1; - sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code); break; } @@ -330,14 +330,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { int32_t sfd = open(fname, O_RDONLY | O_BINARY); if (sfd < 0) { code = -1; - sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + sError("%s, failed to open wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code); break; } code = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, size); close(sfd); if (code < 0) { - sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); + sError("%s, failed to send wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code); break; } @@ -358,7 +358,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { code = -1; } } else { - sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); + sError("%s, failed to send wal since %s, code:0x%" PRIx64, pPeer->id, strerror(errno), code); } return code; @@ -405,9 +405,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { if (pPeer->sversion == 0) pPeer->sversion = 1; sInfo("%s, start to retrieve wals", pPeer->id); - int32_t code = syncRetrieveWal(pPeer); - if (code != 0) { - sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code); + int64_t code = syncRetrieveWal(pPeer); + if (code < 0) { + sError("%s, failed to retrieve wals, code:0x%" PRIx64, pPeer->id, code); return -1; } -- GitLab