diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 9243b68100d2140fc1102f265876066443ec34ae..46c8297fbac8af6ec7fc3558d690313231569711 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -97,7 +97,7 @@ int32_t create_stream() { pRes = taos_query(pConn, /*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/ - "create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);"); + "create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix_'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 233d6a87b86fee566763fdd1c8aec9262008ab42..62f2d51f1b704936b3a1f7dd47963cb7dd0caa5b 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) { const char* idxPattern = "^[0-9]+.idx$"; regex_t logRegPattern; regex_t idxRegPattern; + bool fixed = false; regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); @@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) { actualFileNum = taosArrayGetSize(pLogInfoArray); #endif + { + int32_t i = 0, j = 0; + while (i < actualFileNum && j < metaFileNum) { + SWalFileInfo* pActualFile = taosArrayGet(actualLog, i); + SWalFileInfo* pMetaFile = taosArrayGet(pWal->fileInfoSet, j); + if (pActualFile->firstVer < pMetaFile->firstVer) { + char fNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + i++; + } else if (pActualFile->firstVer > pMetaFile->firstVer) { + taosArrayRemove(pWal->fileInfoSet, j); + metaFileNum--; + } else { + i++; + j++; + } + } + if (i == actualFileNum && j == metaFileNum) { + if (j > 0) { + SWalFileInfo* pLastInfo = taosArrayGet(pWal->fileInfoSet, j - 1); + int64_t fsize = 0; + char fNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pLastInfo->firstVer, fNameStr); + taosStatFile(fNameStr, &fsize, NULL); + if (pLastInfo->fileSize != fsize) { + fixed = true; + pLastInfo->fileSize = fsize; + pLastInfo->lastVer = walScanLogGetLastVer(pWal); + } + } + } else { + fixed = true; + while (i < actualFileNum) { + SWalFileInfo* pActualFile = taosArrayGet(actualLog, i); + char fNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pActualFile->firstVer, fNameStr); + taosStatFile(fNameStr, &pActualFile->fileSize, NULL); + + if (pActualFile->fileSize == 0) { + ASSERT(i == actualFileNum - 1); + taosRemoveFile(fNameStr); + + walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + break; + } + + if (i < actualFileNum - 1) { + pActualFile->lastVer = ((SWalFileInfo*)taosArrayGet(actualLog, i + 1))->firstVer - 1; + taosArrayPush(pWal->fileInfoSet, pActualFile); + i++; + } else { + pActualFile = taosArrayPush(pWal->fileInfoSet, pActualFile); + pActualFile->lastVer = walScanLogGetLastVer(pWal); + if (pActualFile->lastVer == -1) { + taosRemoveFile(fNameStr); + + walBuildIdxName(pWal, pActualFile->firstVer, fNameStr); + taosRemoveFile(fNameStr); + taosArrayPop(pWal->fileInfoSet); + } + break; + } + } + } + } + +#if 0 if (metaFileNum > actualFileNum) { taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); } else if (metaFileNum < actualFileNum) { @@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) { taosArrayPush(pWal->fileInfoSet, pFileInfo); } } +#endif + taosArrayDestroy(actualLog); + actualFileNum = taosArrayGetSize(pWal->fileInfoSet); pWal->writeCur = actualFileNum - 1; - if (actualFileNum > 0) { - pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; - - SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1); - char fnameStr[WAL_FILE_LEN]; - walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); - int64_t fileSize = 0; - taosStatFile(fnameStr, &fileSize, NULL); - /*ASSERT(fileSize != 0);*/ - - if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) { - pLastFileInfo->fileSize = fileSize; - pWal->vers.lastVer = walScanLogGetLastVer(pWal); - ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer; - ASSERT(pWal->vers.lastVer != -1); - int code = walSaveMeta(pWal); - if (code < 0) { - return -1; - } + if (actualFileNum > 0) { + int64_t fLastVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur))->lastVer; + if (fLastVer != -1 && pWal->vers.lastVer != fLastVer) { + fixed = true; + pWal->vers.lastVer = fLastVer; + } + int64_t fFirstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + if (fFirstVer != pWal->vers.firstVer) { + fixed = true; + pWal->vers.firstVer = fFirstVer; } } + if (fixed) { + walSaveMeta(pWal); + } + return 0; } @@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) { // read metafile int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); + if (fileSize == 0) { + taosRemoveFile(fnameStr); + wDebug("vgId:%d wal find empty meta ver %d", pWal->cfg.vgId, metaVer); + return -1; + } int size = (int)fileSize; char* buf = taosMemoryMalloc(size + 5); if (buf == NULL) {