未验证 提交 87175724 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17777 from taosdata/FIX/TS-1984-3.0

fix: fsync wal files and meta if data size not synced yet beyond a threshold
......@@ -126,6 +126,9 @@ extern char tsSmlChildTableName[];
extern char tsSmlTagName[];
extern bool tsSmlDataFormat;
// wal
extern int64_t tsWalFsyncDataSizeLimit;
// internal
extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
......
......@@ -43,7 +43,6 @@ extern "C" {
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE)
typedef enum {
TAOS_WAL_WRITE = 1,
......@@ -159,6 +158,7 @@ void walCleanUp();
// handle open and ctl
SWal *walOpen(const char *path, SWalCfg *pCfg);
int32_t walAlter(SWal *, SWalCfg *pCfg);
int32_t walPersist(SWal *);
void walClose(SWal *);
// write interfaces
......
......@@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR
// udf
bool tsStartUdfd = true;
// wal
int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// internal
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
......@@ -423,6 +426,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0)
return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
......@@ -722,6 +728,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath));
......
......@@ -212,6 +212,12 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied);
// persist wal before starting
if (walPersist(pVnode->pWal) < 0) {
vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
return -1;
}
pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info
......
......@@ -34,6 +34,7 @@ typedef struct {
int64_t createTs;
int64_t closeTs;
int64_t fileSize;
int64_t syncedOffset;
} SWalFileInfo;
typedef struct WalIdxEntry {
......@@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) {
return pInfo->fileSize;
}
static inline int64_t walGetLastFileCachedSize(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return (pInfo->fileSize - pInfo->syncedOffset);
}
static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
......
......@@ -16,6 +16,7 @@
#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "tutil.h"
#include "walInt.h"
......@@ -65,32 +66,43 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// ensure size as non-negative
pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
int64_t stepSize = WAL_SCAN_BUF_SIZE;
uint64_t magic = WAL_MAGIC;
int64_t walCkHeadSz = sizeof(SWalCkHead);
int64_t end = fileSize;
int64_t offset = 0;
int64_t capacity = 0;
int64_t readSize = 0;
char* buf = NULL;
int64_t found = -1;
bool firstTrial = pFileInfo->fileSize < fileSize;
int64_t offset = TMIN(pFileInfo->fileSize, fileSize);
int64_t offsetForward = offset - stepSize + walCkHeadSz - 1;
int64_t offsetBackward = offset;
int64_t retVer = -1;
int64_t lastEntryBeginOffset = 0;
int64_t lastEntryEndOffset = 0;
// check recover size
if (2 * tsWalFsyncDataSizeLimit + offset < end) {
wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
", end:%" PRId64 ", file:%s",
pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr);
}
// search for the valid last WAL entry, e.g. block by block
while (1) {
offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE);
offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1)
: TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1);
end = TMIN(offset + stepSize, fileSize);
if (firstTrial) {
offsetForward = offset;
} else {
offsetBackward = offset;
}
ASSERT(offset <= end);
readSize = end - offset;
capacity = readSize + sizeof(magic);
int64_t limit = WAL_RECOV_SIZE_LIMIT;
if (limit < readSize) {
wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
", end:%" PRId64 ", file:%s",
pWal->cfg.vgId, limit, offset, end, fnameStr);
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
goto _err;
}
void* ptr = taosMemoryRealloc(buf, capacity);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
......@@ -127,6 +139,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
}
logContent = (SWalCkHead*)(buf + pos);
if (walValidHeadCksum(logContent) != 0) {
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
offset + pos, fnameStr);
haystack = buf + pos + 1;
......@@ -179,46 +192,41 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
}
// found one
found = pos;
retVer = logContent->head.version;
lastEntryBeginOffset = offset + pos;
lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen;
// try next
haystack = buf + pos + 1;
}
if (found >= 0 || offset == 0) break;
// go backwards, e.g. by at most one WAL scan buf size
end = TMIN(offset + walCkHeadSz - 1, fileSize);
firstTrial = false;
if (end == fileSize) firstTrial = false;
if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue;
if (retVer >= 0 || offset == 0) break;
}
// determine end of last entry
SWalCkHead* lastEntry = (found >= 0) ? (SWalCkHead*)(buf + found) : NULL;
int64_t retVer = -1;
int64_t lastEntryBeginOffset = 0;
int64_t lastEntryEndOffset = 0;
if (lastEntry == NULL) {
if (retVer < 0) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
} else {
retVer = lastEntry->head.version;
lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf);
lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
}
// truncate file
if (lastEntryEndOffset != fileSize) {
wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr,
lastEntryEndOffset, fileSize);
if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
pFileInfo->fileSize = lastEntryEndOffset;
taosCloseFile(&pFile);
......@@ -621,6 +629,7 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo->createTs = ts;
pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0;
pNewInfo->syncedOffset = 0;
taosArrayPush(pArray, pNewInfo);
taosMemoryFree(pNewInfo);
return 0;
......@@ -771,6 +780,12 @@ static int walFindCurMetaVer(SWal* pWal) {
return metaVer;
}
void walUpdateSyncedOffset(SWal* pWal) {
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
if (pFileInfo == NULL) return;
pFileInfo->syncedOffset = pFileInfo->fileSize;
}
int walSaveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN];
......@@ -790,6 +805,9 @@ int walSaveMeta(SWal* pWal) {
return -1;
}
// update synced offset
(void)walUpdateSyncedOffset(pWal);
// flush to a tmpfile
n = walBuildTmpMetaName(pWal, tmpFnameStr);
ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name");
......
......@@ -187,6 +187,13 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
return 0;
}
int32_t walPersist(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex);
int32_t ret = walSaveMeta(pWal);
taosThreadMutexUnlock(&pWal->mutex);
return ret;
}
void walClose(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex);
(void)walSaveMeta(pWal);
......
......@@ -16,6 +16,7 @@
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tglobal.h"
#include "walInt.h"
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
......@@ -252,23 +253,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
}
}
if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) {
if (walSaveMeta(pWal) < 0) {
return -1;
}
}
return 0;
}
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
pWal->vers.verInSnapshotting = ver;
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling
if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex);
if (walGetLastFileSize(pWal) != 0) {
walRollImpl(pWal);
if (walRollImpl(pWal) < 0) {
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
goto _err;
}
}
taosThreadMutexUnlock(&pWal->mutex);
}
taosThreadMutexUnlock(&pWal->mutex);
return 0;
_err:
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
int32_t walEndSnapshot(SWal *pWal) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册