未验证 提交 bb39d94f 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22357 from taosdata/fix/liaohj

other: merge fix from 3.0 to main.
...@@ -1277,7 +1277,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1277,7 +1277,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (done) { if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
streamMetaReleaseTask(pMeta, pTask);
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
...@@ -1303,13 +1302,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1303,13 +1302,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
} }
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task.
streamMetaReleaseTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pStreamTask);
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} }
streamMetaReleaseTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pStreamTask);
} else { } else {
// todo update the chkInfo version for current task. // todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of // this task has an associated history stream task, so we need to scan wal from the end version of
...@@ -1515,7 +1512,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1515,7 +1512,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) { if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) { if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
...@@ -1528,8 +1525,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1528,8 +1525,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
return 0; return 0;
} else { } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); // todo add one function to handle this
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId);
return -1; return -1;
} }
} }
......
...@@ -1046,7 +1046,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { ...@@ -1046,7 +1046,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64, qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
pWindow->skey = INT64_MIN; pWindow->skey = INT64_MIN;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo(SStreamTask* pTask); static int32_t updateCheckPointInfo(SStreamTask* pTask);
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
...@@ -544,8 +545,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -544,8 +545,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
return 0; return 0;
} }
// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not
// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING.
bool streamTaskIsIdle(const SStreamTask* pTask) { bool streamTaskIsIdle(const SStreamTask* pTask) {
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP ||
pTask->status.taskStatus == TASK_STATUS__DROPPING);
} }
int32_t streamTaskEndScanWAL(SStreamTask* pTask) { int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
......
...@@ -330,7 +330,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -330,7 +330,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); qDebug("s-task:0x%x set task status:%s and start to unregister it", taskId,
streamGetTaskStatusStr(TASK_STATUS__DROPPING));
while (1) { while (1) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册