提交 41474e40 编写于 作者: L Liu Jicong

enh(wal): auto fix

上级 14c4ff1d
...@@ -97,7 +97,7 @@ int32_t create_stream() { ...@@ -97,7 +97,7 @@ int32_t create_stream() {
pRes = taos_query(pConn, pRes = taos_query(pConn,
/*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/ /*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/
"create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);"); "create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix_'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
const char* idxPattern = "^[0-9]+.idx$"; const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern; regex_t logRegPattern;
regex_t idxRegPattern; regex_t idxRegPattern;
bool fixed = false;
regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
...@@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) {
actualFileNum = taosArrayGetSize(pLogInfoArray); actualFileNum = taosArrayGetSize(pLogInfoArray);
#endif #endif
{
int32_t i = 0, j = 0;
while (i < actualFileNum && j < metaFileNum) {
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
SWalFileInfo* pMetaFile = taosArrayGet(pWal->fileInfoSet, j);
if (pActualFile->firstVer < pMetaFile->firstVer) {
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
i++;
} else if (pActualFile->firstVer > pMetaFile->firstVer) {
taosArrayRemove(pWal->fileInfoSet, j);
metaFileNum--;
} else {
i++;
j++;
}
}
if (i == actualFileNum && j == metaFileNum) {
if (j > 0) {
SWalFileInfo* pLastInfo = taosArrayGet(pWal->fileInfoSet, j - 1);
int64_t fsize = 0;
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastInfo->firstVer, fNameStr);
taosStatFile(fNameStr, &fsize, NULL);
if (pLastInfo->fileSize != fsize) {
fixed = true;
pLastInfo->fileSize = fsize;
pLastInfo->lastVer = walScanLogGetLastVer(pWal);
}
}
} else {
fixed = true;
while (i < actualFileNum) {
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
taosStatFile(fNameStr, &pActualFile->fileSize, NULL);
if (pActualFile->fileSize == 0) {
ASSERT(i == actualFileNum - 1);
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
break;
}
if (i < actualFileNum - 1) {
pActualFile->lastVer = ((SWalFileInfo*)taosArrayGet(actualLog, i + 1))->firstVer - 1;
taosArrayPush(pWal->fileInfoSet, pActualFile);
i++;
} else {
pActualFile = taosArrayPush(pWal->fileInfoSet, pActualFile);
pActualFile->lastVer = walScanLogGetLastVer(pWal);
if (pActualFile->lastVer == -1) {
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
taosArrayPop(pWal->fileInfoSet);
}
break;
}
}
}
}
#if 0
if (metaFileNum > actualFileNum) { if (metaFileNum > actualFileNum) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) { } else if (metaFileNum < actualFileNum) {
...@@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayPush(pWal->fileInfoSet, pFileInfo); taosArrayPush(pWal->fileInfoSet, pFileInfo);
} }
} }
#endif
taosArrayDestroy(actualLog); taosArrayDestroy(actualLog);
actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
pWal->writeCur = actualFileNum - 1; pWal->writeCur = actualFileNum - 1;
if (actualFileNum > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
/*ASSERT(fileSize != 0);*/
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
pLastFileInfo->fileSize = fileSize;
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1);
int code = walSaveMeta(pWal); if (actualFileNum > 0) {
if (code < 0) { int64_t fLastVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur))->lastVer;
return -1; if (fLastVer != -1 && pWal->vers.lastVer != fLastVer) {
} fixed = true;
pWal->vers.lastVer = fLastVer;
}
int64_t fFirstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
if (fFirstVer != pWal->vers.firstVer) {
fixed = true;
pWal->vers.firstVer = fFirstVer;
} }
} }
if (fixed) {
walSaveMeta(pWal);
}
return 0; return 0;
} }
...@@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) { ...@@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) {
// read metafile // read metafile
int64_t fileSize = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL); taosStatFile(fnameStr, &fileSize, NULL);
if (fileSize == 0) {
taosRemoveFile(fnameStr);
wDebug("vgId:%d wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
return -1;
}
int size = (int)fileSize; int size = (int)fileSize;
char* buf = taosMemoryMalloc(size + 5); char* buf = taosMemoryMalloc(size + 5);
if (buf == NULL) { if (buf == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册