diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 5c7a726d696397153134a639041e93cd8d355e5a..09198aa038c9f0267b141aae0555c661de13e876 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -195,7 +195,7 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); int32_t walReadVer(SWalReader *pRead, int64_t ver); -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); +int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); int64_t walReaderGetValidFirstVer(const SWalReader* pReader); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index ea099373d7358d34ee790d83e9b6c854c61f9574..d4db43dd21c35200c5df63edd69f00ae1ee799ce 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -294,7 +294,7 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { - if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + if (walReaderSeekVer(pReader->pWalReader, ver) < 0) { return -1; } tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 4e18e570b5c8949822cf2f8b8892e888ce91166a..c855d323fc18901f540b718090b7c676946a2d6f 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -112,18 +112,25 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { pTask->chkInfo.currentVer = firstVer; tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); - } - int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - if (currentVer != pTask->chkInfo.currentVer) { - int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); - if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + // todo need retry if failed + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } - - // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } else { + int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (currentVer != pTask->chkInfo.currentVer) { + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + + // append the data for the stream + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + } } SPackedData packData = {0}; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 4a6be5bfb71c631221bbd9802301cacd77624906..66c32437a3c95e19cd86a122b5c9d8a51caefd13 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -184,6 +184,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } + if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner if (walReadChangeFile(pReader, pRet->firstVer) < 0) { @@ -203,7 +204,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return 0; } -int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { +int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { SWal *pWal = pReader->pWal; if (ver == pReader->curVersion) { wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver); @@ -233,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer); if (pRead->curVersion != fetchVer) { - if (walReadSeekVer(pRead, fetchVer) < 0) { + if (walReaderSeekVer(pRead, fetchVer) < 0) { return -1; } seeked = true; @@ -336,7 +337,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } if (pRead->curVersion != ver) { - code = walReadSeekVer(pRead, ver); + code = walReaderSeekVer(pRead, ver); if (code < 0) { // pRead->curVersion = ver; // pRead->curInvalid = 1; @@ -471,7 +472,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { taosThreadMutexLock(&pReader->mutex); if (pReader->curVersion != ver) { - if (walReadSeekVer(pReader, ver) < 0) { + if (walReaderSeekVer(pReader, ver) < 0) { wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); taosThreadMutexUnlock(&pReader->mutex); return -1;