From b83860372cc04a084c9dedb8c002577d8a271a55 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Aug 2023 00:50:30 +0800 Subject: [PATCH] fix(stream): dump results to sink node before paused. --- source/libs/executor/src/executor.c | 5 ----- source/libs/stream/src/streamExec.c | 5 ++++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e4ddf9ca6c..231653c728 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -186,11 +186,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); } -//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { -// SExecTaskInfo* pTaskInfo = tinfo; -// pTaskInfo->code = code; -//} - int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d1dff0f2e7..4e39f1448a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -196,10 +196,13 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { SSDataBlock* output = NULL; uint64_t ts = 0; - if (qExecTask(exec, &output, &ts) < 0) { + code = qExecTask(exec, &output, &ts); + if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { + qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); continue; } + // the generated results before fill-history task been paused, should be dispatched to sink node if (output == NULL && qStreamRecoverScanFinished(exec)) { finished = true; break; -- GitLab