提交 b8386037 编写于 作者: H Haojun Liao

fix(stream): dump results to sink node before paused.

上级 8c1c17e3
...@@ -186,11 +186,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { ...@@ -186,11 +186,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
} }
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
// SExecTaskInfo* pTaskInfo = tinfo;
// pTaskInfo->code = code;
//}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
......
...@@ -196,10 +196,13 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -196,10 +196,13 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
continue; continue;
} }
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL && qStreamRecoverScanFinished(exec)) { if (output == NULL && qStreamRecoverScanFinished(exec)) {
finished = true; finished = true;
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册