提交 437eb93a 编写于 作者: H Haojun Liao

fix(stream): fix error while fill history exists.

上级 daafe240
......@@ -554,13 +554,15 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
sdbRelease(pSdb, pVgroup);
return code;
}
setHTasksId(pSourceTaskList, pHSourceTaskList);
}
sdbRelease(pSdb, pVgroup);
}
if (pStream->conf.fillHistory) {
setHTasksId(pSourceTaskList, pHSourceTaskList);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -343,7 +343,7 @@ static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 0) {
qDebug("s-task:%s wait for stream task:%s for %.2fs to execute all data in inputQ", pTask->id.idStr,
qDebug("s-task:%s wait for stream task:%s for %.2fs to handle all data in inputQ", pTask->id.idStr,
pStreamTask->id.idStr, el);
}
}
......
......@@ -479,22 +479,28 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
if (pHTask == NULL && pTask->status.taskStatus == TASK_STATUS__NORMAL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it may not be built or have been destroyed",
pTask->id.idStr, pMeta->vgId, pTask->historyTaskId.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qWarn(
"s-task:%s vgId:%d status:%s failed to launch history task:0x%x, since it may not be built, or have been "
"destroyed, or should stop exec",
pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
doCheckDownstreamStatus(pTask, pHTask);
if (pHTask != NULL) {
doCheckDownstreamStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask);
}
// not in timer anymore
pTask->status.timerActive = 0;
streamMetaReleaseTask(pMeta, pHTask);
streamMetaReleaseTask(pMeta, pTask);
} else {
qError("s-task:0x%x failed to load task", pInfo->taskId);
qError("s-task:0x%x failed to load task, it may have been destoryed", pInfo->taskId);
}
taosMemoryFree(pInfo);
......@@ -664,7 +670,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey;
int64_t ekey = pRange->window.ekey + 1;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
......
......@@ -42,11 +42,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
if (fillHistory) {
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
} else {
pTask->status.taskStatus = TASK_STATUS__NORMAL;
}
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册