未验证 提交 6582a3b4 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21980 from taosdata/refact/fillhistory

fix(stream): fix error during transferring executor state
......@@ -351,9 +351,13 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
// todo handle stream task is dropped here
if (pStreamTask == NULL) {
qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed",
pTask->id.idStr, pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else {
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
}
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
......@@ -377,7 +381,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d",
", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
} else {
......@@ -473,6 +477,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return 0;
}
}
break;
......@@ -564,7 +571,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask);
if (code < 0) {
if (code < 0) { // todo this status shoudl be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册