未验证 提交 f0e2dd38 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #5269 from taosdata/hotfix/sf

[TD-3064]<fix>: fix wal file size(>2G) overflow with int32_t
...@@ -25,7 +25,8 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { ...@@ -25,7 +25,8 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
char tmpPath[PATH_MAX]; char tmpPath[PATH_MAX];
int32_t len = strlen(tsTempDir); int32_t len = strlen(tsTempDir);
memcpy(tmpPath, tsTempDir, len); memcpy(tmpPath, tsTempDir, len);
static uint64_t seqId = 0;
if (tmpPath[len - 1] != '/') { if (tmpPath[len - 1] != '/') {
tmpPath[len++] = '/'; tmpPath[len++] = '/';
} }
...@@ -36,8 +37,10 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { ...@@ -36,8 +37,10 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
strcat(tmpPath, "-%d-%s"); strcat(tmpPath, "-%d-%s");
} }
char rand[8] = {0}; char rand[32] = {0};
taosRandStr(rand, tListLen(rand) - 1);
sprintf(rand, "%"PRIu64, atomic_add_fetch_64(&seqId, 1));
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand); snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
} }
......
...@@ -170,14 +170,14 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { ...@@ -170,14 +170,14 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
return sizeof(SWalHead) + pHead->len; return sizeof(SWalHead) + pHead->len;
} }
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) { static int64_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) {
int32_t sfd = open(name, O_RDONLY | O_BINARY); int32_t sfd = open(name, O_RDONLY | O_BINARY);
if (sfd < 0) { if (sfd < 0) {
sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno)); sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno));
return -1; return -1;
} }
int32_t code = (int32_t)taosLSeek(sfd, offset, SEEK_SET); int64_t code = taosLSeek(sfd, offset, SEEK_SET);
if (code < 0) { if (code < 0) {
sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno)); sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
close(sfd); close(sfd);
...@@ -187,7 +187,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -187,7 +187,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion); sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
SWalHead *pHead = malloc(SYNC_MAX_SIZE); SWalHead *pHead = malloc(SYNC_MAX_SIZE);
int32_t bytes = 0; int64_t bytes = 0;
while (1) { while (1) {
code = syncReadOneWalRecord(sfd, pHead); code = syncReadOneWalRecord(sfd, pHead);
...@@ -198,13 +198,13 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -198,13 +198,13 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
if (code == 0) { if (code == 0) {
code = bytes; code = bytes;
sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes); sDebug("%s, read to the end of wal, bytes:%" PRId64, pPeer->id, bytes);
break; break;
} }
sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
int32_t wsize = code; int32_t wsize = (int32_t)code;
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if (ret != wsize) { if (ret != wsize) {
code = -1; code = -1;
...@@ -228,7 +228,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -228,7 +228,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
return code; return code;
} }
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { static int64_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int32_t once = 0; // last WAL has once ever been processed int32_t once = 0; // last WAL has once ever been processed
int64_t offset = 0; int64_t offset = 0;
...@@ -243,9 +243,9 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -243,9 +243,9 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
if (syncAreFilesModified(pNode, pPeer)) return -1; if (syncAreFilesModified(pNode, pPeer)) return -1;
if (syncGetWalVersion(pNode, pPeer) < 0) return -1; if (syncGetWalVersion(pNode, pPeer) < 0) return -1;
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset); int64_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
if (bytes < 0) { if (bytes < 0) {
sDebug("%s, failed to retrieve last wal", pPeer->id); sDebug("%s, failed to retrieve last wal, bytes:%" PRId64, pPeer->id, bytes);
return bytes; return bytes;
} }
...@@ -263,7 +263,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -263,7 +263,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
// if all data up to fversion is read out, it is over // if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) { if (pPeer->sversion >= fversion && fversion > 0) {
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes, sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%" PRId64 " sver:%" PRIu64, pPeer->id, fversion, bytes,
pPeer->sversion); pPeer->sversion);
return 0; return 0;
} }
...@@ -277,19 +277,19 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -277,19 +277,19 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again // if bytes > 0, file is updated, or fversion is not reached but file still open, read again
once = 1; once = 1;
offset += bytes; offset += bytes;
sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id, sDebug("%s, continue retrieve last wal, bytes:%" PRId64 " offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id,
bytes, offset, pPeer->sversion, fversion); bytes, offset, pPeer->sversion, fversion);
} }
return -1; return -1;
} }
static int32_t syncRetrieveWal(SSyncPeer *pPeer) { static int64_t syncRetrieveWal(SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
char fname[TSDB_FILENAME_LEN * 3]; char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2]; char wname[TSDB_FILENAME_LEN * 2];
int32_t size; int32_t size;
int32_t code = -1; int64_t code = -1;
int64_t index = 0; int64_t index = 0;
while (1) { while (1) {
...@@ -297,7 +297,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -297,7 +297,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
wname[0] = 0; wname[0] = 0;
code = (*pNode->getWalInfoFp)(pNode->vgId, wname, &index); code = (*pNode->getWalInfoFp)(pNode->vgId, wname, &index);
if (code < 0) { if (code < 0) {
sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code); sError("%s, failed to get wal info since:%s, code:0x%" PRIx64, pPeer->id, strerror(errno), code);
break; break;
} }
...@@ -309,6 +309,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -309,6 +309,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
if (code == 0) { // last wal if (code == 0) { // last wal
code = syncProcessLastWal(pPeer, wname, index); code = syncProcessLastWal(pPeer, wname, index);
sInfo("%s, last wal processed, code:%" PRId64, pPeer->id, code);
break; break;
} }
...@@ -319,7 +320,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -319,7 +320,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
struct stat fstat; struct stat fstat;
if (stat(fname, &fstat) < 0) { if (stat(fname, &fstat) < 0) {
code = -1; code = -1;
sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
break; break;
} }
...@@ -329,14 +330,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -329,14 +330,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
int32_t sfd = open(fname, O_RDONLY | O_BINARY); int32_t sfd = open(fname, O_RDONLY | O_BINARY);
if (sfd < 0) { if (sfd < 0) {
code = -1; code = -1;
sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); sError("%s, failed to open wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
break; break;
} }
code = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, size); code = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd); close(sfd);
if (code < 0) { if (code < 0) {
sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); sError("%s, failed to send wal:%s for retrieve since %s, code:0x%" PRIx64, pPeer->id, fname, strerror(errno), code);
break; break;
} }
...@@ -357,7 +358,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -357,7 +358,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
code = -1; code = -1;
} }
} else { } else {
sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); sError("%s, failed to send wal since %s, code:0x%" PRIx64, pPeer->id, strerror(errno), code);
} }
return code; return code;
...@@ -404,9 +405,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -404,9 +405,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
if (pPeer->sversion == 0) pPeer->sversion = 1; if (pPeer->sversion == 0) pPeer->sversion = 1;
sInfo("%s, start to retrieve wals", pPeer->id); sInfo("%s, start to retrieve wals", pPeer->id);
int32_t code = syncRetrieveWal(pPeer); int64_t code = syncRetrieveWal(pPeer);
if (code != 0) { if (code < 0) {
sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code); sError("%s, failed to retrieve wals, code:0x%" PRIx64, pPeer->id, code);
return -1; return -1;
} }
......
...@@ -171,7 +171,9 @@ static void *taosThreadToOpenNewFile(void *param) { ...@@ -171,7 +171,9 @@ static void *taosThreadToOpenNewFile(void *param) {
int32_t fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); int32_t fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (fd < 0) { if (fd < 0) {
uError("open new log file fail! fd:%d reason:%s", fd, strerror(errno)); tsLogObj.openInProgress = 0;
tsLogObj.lines = tsLogObj.maxLines - 1000;
uError("open new log file fail! fd:%d reason:%s, reuse lastlog", fd, strerror(errno));
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册