diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b51289de5e46e05d58d5af8a8b130db292439103..ad7e4e104a396cb54342850f12596b48baedb830 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -138,6 +138,8 @@ typedef struct { int8_t enableRef; } SWalFilterCond; +typedef struct SWalReader SWalReader; + // todo hide this struct typedef struct SWalReader { SWal *pWal; @@ -198,6 +200,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); +int64_t walReaderGetValidFirstVer(const SWalReader* pReader); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c8b598763d2bd265c382a411d2c323ad23a3cb93..0eb9a92f87e285cb044295d23edad8ad9fab328d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -121,17 +121,27 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; // seek the stored version and extract data from WAL - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit - streamMetaReleaseTask(pStreamMeta, pTask); - continue; + int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); + if (pTask->chkInfo.currentVer < firstVer) { + pTask->chkInfo.currentVer = firstVer; + tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, + pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); } - // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer != pTask->chkInfo.currentVer) { + int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } SPackedData packData = {0}; - code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); + int32_t code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); if (code != TSDB_CODE_SUCCESS) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index dc3ff3e6de14b632a2f4a76927dfbae823e1652a..b185e7f00ab1996faf68eb1bf00674e4170349a6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -101,6 +101,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { } int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } +int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); } static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0;