提交 0fcfc142 编写于 作者: M Minglei Jin

[TD-3064]<fix>: fix wal file size(>2G) overflow with int32_t

上级 8e8437ad
...@@ -170,14 +170,14 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { ...@@ -170,14 +170,14 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
return sizeof(SWalHead) + pHead->len; return sizeof(SWalHead) + pHead->len;
} }
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) { static int64_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) {
int32_t sfd = open(name, O_RDONLY | O_BINARY); int32_t sfd = open(name, O_RDONLY | O_BINARY);
if (sfd < 0) { if (sfd < 0) {
sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno)); sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno));
return -1; return -1;
} }
int32_t code = (int32_t)taosLSeek(sfd, offset, SEEK_SET); int64_t code = taosLSeek(sfd, offset, SEEK_SET);
if (code < 0) { if (code < 0) {
sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno)); sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
close(sfd); close(sfd);
...@@ -187,7 +187,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -187,7 +187,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion); sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
SWalHead *pHead = malloc(SYNC_MAX_SIZE); SWalHead *pHead = malloc(SYNC_MAX_SIZE);
int32_t bytes = 0; int64_t bytes = 0;
while (1) { while (1) {
code = syncReadOneWalRecord(sfd, pHead); code = syncReadOneWalRecord(sfd, pHead);
...@@ -198,13 +198,13 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -198,13 +198,13 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
if (code == 0) { if (code == 0) {
code = bytes; code = bytes;
sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes); sDebug("%s, read to the end of wal, bytes:%" PRId64, pPeer->id, bytes);
break; break;
} }
sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
int32_t wsize = code; int32_t wsize = (int32_t)code;
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if (ret != wsize) { if (ret != wsize) {
code = -1; code = -1;
...@@ -228,7 +228,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -228,7 +228,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
return code; return code;
} }
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { static int64_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int32_t once = 0; // last WAL has once ever been processed int32_t once = 0; // last WAL has once ever been processed
int64_t offset = 0; int64_t offset = 0;
...@@ -243,9 +243,9 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -243,9 +243,9 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
if (syncAreFilesModified(pNode, pPeer)) return -1; if (syncAreFilesModified(pNode, pPeer)) return -1;
if (syncGetWalVersion(pNode, pPeer) < 0) return -1; if (syncGetWalVersion(pNode, pPeer) < 0) return -1;
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset); int64_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
if (bytes < 0) { if (bytes < 0) {
sDebug("%s, failed to retrieve last wal, bytes:%d", pPeer->id, bytes); sDebug("%s, failed to retrieve last wal, bytes:%" PRId64, pPeer->id, bytes);
return bytes; return bytes;
} }
...@@ -263,7 +263,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -263,7 +263,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
// if all data up to fversion is read out, it is over // if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) { if (pPeer->sversion >= fversion && fversion > 0) {
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes, sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%" PRId64 " sver:%" PRIu64, pPeer->id, fversion, bytes,
pPeer->sversion); pPeer->sversion);
return 0; return 0;
} }
...@@ -277,19 +277,19 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -277,19 +277,19 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again // if bytes > 0, file is updated, or fversion is not reached but file still open, read again
once = 1; once = 1;
offset += bytes; offset += bytes;
sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id, sDebug("%s, continue retrieve last wal, bytes:%" PRId64 " offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id,
bytes, offset, pPeer->sversion, fversion); bytes, offset, pPeer->sversion, fversion);
} }
return -1; return -1;
} }
static int32_t syncRetrieveWal(SSyncPeer *pPeer) { static int64_t syncRetrieveWal(SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
char fname[TSDB_FILENAME_LEN * 3]; char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2]; char wname[TSDB_FILENAME_LEN * 2];
int32_t size; int32_t size;
int32_t code = -1; int64_t code = -1;
int64_t index = 0; int64_t index = 0;
while (1) { while (1) {
...@@ -297,7 +297,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -297,7 +297,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
wname[0] = 0; wname[0] = 0;
code = (*pNode->getWalInfoFp)(pNode->vgId, wname, &index); code = (*pNode->getWalInfoFp)(pNode->vgId, wname, &index);
if (code < 0) { if (code < 0) {
sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code); sError("%s, failed to get wal info since:%s, code:0x%" PRIx64, pPeer->id, strerror(errno), code);
break; break;
} }
...@@ -309,7 +309,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -309,7 +309,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
if (code == 0) { // last wal if (code == 0) { // last wal
code = syncProcessLastWal(pPeer, wname, index); code = syncProcessLastWal(pPeer, wname, index);
sInfo("%s, last wal processed, code:%d", pPeer->id, code); sInfo("%s, last wal processed, code:%" PRId64, pPeer->id, code);
break; break;
} }
...@@ -320,7 +320,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -320,7 +320,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
struct stat fstat; struct stat fstat;
if (stat(fname, &fstat) < 0) { if (stat(fname, &fstat) < 0) {
code = -1; code = -1;
sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
break; break;
} }
...@@ -330,14 +330,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -330,14 +330,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
int32_t sfd = open(fname, O_RDONLY | O_BINARY); int32_t sfd = open(fname, O_RDONLY | O_BINARY);
if (sfd < 0) { if (sfd < 0) {
code = -1; code = -1;
sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); sError("%s, failed to open wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
break; break;
} }
code = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, size); code = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd); close(sfd);
if (code < 0) { if (code < 0) {
sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); sError("%s, failed to send wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
break; break;
} }
...@@ -358,7 +358,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -358,7 +358,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
code = -1; code = -1;
} }
} else { } else {
sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); sError("%s, failed to send wal since %s, code:0x%" PRIx64, pPeer->id, strerror(errno), code);
} }
return code; return code;
...@@ -405,9 +405,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -405,9 +405,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
if (pPeer->sversion == 0) pPeer->sversion = 1; if (pPeer->sversion == 0) pPeer->sversion = 1;
sInfo("%s, start to retrieve wals", pPeer->id); sInfo("%s, start to retrieve wals", pPeer->id);
int32_t code = syncRetrieveWal(pPeer); int64_t code = syncRetrieveWal(pPeer);
if (code != 0) { if (code < 0) {
sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code); sError("%s, failed to retrieve wals, code:0x%" PRIx64, pPeer->id, code);
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册