提交 b434ac2f 编写于 作者: B Benguang Zhao

enh: improve walScanLogGetLastVer

上级 0b798123
......@@ -47,9 +47,7 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
}
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
terrno = TSDB_CODE_SUCCESS;
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
......@@ -74,13 +72,12 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
int64_t capacity = 0;
int64_t readSize = 0;
char* buf = NULL;
bool firstTrial = pFileInfo->fileSize < fileSize;
int64_t offset = TMIN(pFileInfo->fileSize, fileSize);
int64_t offsetForward = offset - stepSize + walCkHeadSz - 1;
int64_t offsetBackward = offset;
int64_t retVer = -1;
int64_t lastEntryBeginOffset = 0;
int64_t lastEntryEndOffset = 0;
int64_t recordLen = 0;
bool forwardStage = false;
// check recover size
if (2 * tsWalFsyncDataSizeLimit + offset < end) {
......@@ -91,14 +88,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// search for the valid last WAL entry, e.g. block by block
while (1) {
offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1)
: TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1);
offset = (lastEntryEndOffset > 0) ? offset : TMAX(0, offset - stepSize + walCkHeadSz - 1);
end = TMIN(offset + stepSize, fileSize);
if (firstTrial) {
offsetForward = offset;
} else {
offsetBackward = offset;
}
readSize = end - offset;
capacity = readSize + sizeof(magic);
......@@ -129,7 +120,16 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
int64_t pos = 0;
SWalCkHead* logContent = NULL;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(magic))) != NULL) {
while (true) {
forwardStage = (lastEntryEndOffset > 0 || offset == 0);
terrno = TSDB_CODE_SUCCESS;
if (forwardStage) {
candidate = (readSize - (haystack - buf)) > 0 ? haystack : NULL;
} else {
candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(magic));
}
if (candidate == NULL) break;
pos = candidate - buf;
// validate head
......@@ -137,13 +137,20 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
if (len < walCkHeadSz) {
break;
}
logContent = (SWalCkHead*)(buf + pos);
if (logContent->magic != WAL_MAGIC) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(forwardStage);
wError("vgId:%d, wal magic is corrupted. offset:%" PRId64 ", file:%s ", pWal->cfg.vgId, offset + pos, fnameStr);
break;
}
if (walValidHeadCksum(logContent) != 0) {
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
offset + pos, fnameStr);
haystack = buf + pos + 1;
if (firstTrial) {
if (forwardStage) {
break;
} else {
continue;
......@@ -151,9 +158,9 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
}
// validate body
int64_t size = walCkHeadSz + logContent->head.bodyLen;
if (len < size) {
int64_t extraSize = size - len;
recordLen = walCkHeadSz + logContent->head.bodyLen;
if (len < recordLen) {
int64_t extraSize = recordLen - len;
if (capacity < readSize + extraSize + sizeof(magic)) {
capacity += extraSize;
void* ptr = taosMemoryRealloc(buf, capacity);
......@@ -184,7 +191,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
wWarn("vgId:%d, failed to validate checksum of wal entry body. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
offset + pos, fnameStr);
haystack = buf + pos + 1;
if (firstTrial) {
if (forwardStage) {
break;
} else {
continue;
......@@ -194,21 +201,14 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// found one
retVer = logContent->head.version;
lastEntryBeginOffset = offset + pos;
lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen;
lastEntryEndOffset = offset + pos + recordLen;
// try next
haystack = buf + pos + 1;
haystack = buf + pos + recordLen;
}
if (end == fileSize) firstTrial = false;
if (firstTrial) {
if (terrno == TSDB_CODE_SUCCESS) {
continue;
} else {
firstTrial = false;
}
}
if (retVer >= 0 || offset == 0) break;
offset = (lastEntryEndOffset > 0) ? lastEntryEndOffset : offset;
if (forwardStage && (terrno != TSDB_CODE_SUCCESS || end == fileSize)) break;
}
if (retVer < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册