diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index ff9f95d5fadf74a846d4e9b7c184c16793736ec1..6e3036a66ba73c60b421ad73a2a6fb7335a3803f 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -61,9 +61,10 @@ static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); if (pTask->chkInfo.currentVer < firstVer) { + tqWarn("vgId:%d s-task:%s ver:%"PRId64" earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, + pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer); + pTask->chkInfo.currentVer = firstVer; - tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, - pTask->id.idStr, firstVer, pTask->chkInfo.currentVer); // todo need retry if failed int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index 8b3d04e32c35e7143ccdc46fdcdf9f56ec09d0fc..4adff71d8518af05677a34fc90534783a6cbc508 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -1,10 +1,9 @@ aux_source_directory(src EXECUTOR_SRC) -#add_library(executor ${EXECUTOR_SRC}) add_library(executor STATIC ${EXECUTOR_SRC}) target_link_libraries(executor - PRIVATE os util common function parser planner qcom vnode scalar nodes index stream + PRIVATE os util common function parser planner qcom vnode scalar nodes index ) target_include_directories( diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 55474541ede5d51236b3e53098ed480d9ce4ad0c..637d661343b3dba38657d820911b273763e7fcbe 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -163,7 +163,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t code = 0; ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - void* exec = pTask->exec.pExecutor; qSetStreamOpOpen(exec); @@ -404,7 +403,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { { // set input - void* pExecutor = pTask->exec.pExecutor; + void* pExecutor = pTask->exec.pExecutor; const SStreamQueueItem* pItem = pInput; if (pItem->type == STREAM_INPUT__GET_RES) {