提交 9055f637 编写于 作者: B Benguang Zhao

fix: ftruncate WAL log after the last valid WAL record properly in walScanLogGetLastVer

上级 76865258
......@@ -45,12 +45,11 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0);
#if 0
for (int i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
if (sz <= 0) {
wError("No WAL log file found.");
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
#endif
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz - 1);
char fnameStr[WAL_FILE_LEN];
......@@ -61,7 +60,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize);
pLastFileInfo->fileSize = fileSize;
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
......@@ -69,28 +68,30 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
uint64_t magic = WAL_MAGIC;
char* buf = taosMemoryMalloc(readSize + 5);
char* buf = taosMemoryMalloc(readSize + sizeof(uint64_t));
if (buf == NULL) {
taosCloseFile(&pFile);
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
goto _err;
}
int64_t offset;
offset = taosLSeekFile(pFile, -readSize, SEEK_END);
if (readSize != taosReadFile(pFile, buf, readSize)) {
taosMemoryFree(buf);
taosCloseFile(&pFile);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
goto _err;
}
int64_t walCkHeadSz = sizeof(SWalCkHead);
char* found = NULL;
while (1) {
char* haystack = buf;
char* candidate;
char* candidate = NULL;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate
int64_t len = readSize - (candidate - buf);
if (len < walCkHeadSz) {
break;
}
SWalCkHead* logContent = (SWalCkHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = candidate;
......@@ -98,35 +99,26 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
haystack = candidate + 1;
}
if (found || offset == 0) break;
offset = TMIN(0, offset - readSize + sizeof(uint64_t));
int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET);
ASSERT(offset == offset2);
if (readSize != taosReadFile(pFile, buf, readSize)) {
taosMemoryFree(buf);
taosCloseFile(&pFile);
// go backwards, i.e. by at most one WAL scan buf size
offset = TMAX(0, offset - readSize + walCkHeadSz);
int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
if (ret < 0) {
wError("failed to lseek file due to %s. offset:%lld", strerror(errno), offset);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
goto _err;
}
#if 0
if (found == buf) {
SWalCkHead* logContent = (SWalCkHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted
taosMemoryFree(buf);
taosCloseFile(&pFile);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
ASSERT(offset == ret);
if (readSize != taosReadFile(pFile, buf, readSize)) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
#endif
}
if (found == NULL) {
// file corrupted, no complete log
// TODO delete and search in previous files
/*ASSERT(0);*/
wError("WAL log file corrupted: no valid WAL record found. file: %s", fnameStr);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
goto _err;
}
// truncate file
......@@ -137,15 +129,28 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
if (lastEntryEndOffset != fileSize) {
wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset,
fileSize);
taosFtruncateFile(pFile, lastEntryEndOffset);
if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset;
pWal->totSize -= (fileSize - lastEntryEndOffset);
}
taosCloseFile(&pFile);
taosMemoryFree(buf);
return retVer;
_err:
taosCloseFile(&pFile);
taosMemoryFree(buf);
return -1;
}
int walCheckAndRepairMeta(SWal* pWal) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册