未验证 提交 6365d500 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22271 from taosdata/fix/3_liaohj

fix(stream): dump results to sink node before paused.
......@@ -590,7 +590,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
......
......@@ -186,11 +186,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
}
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
// SExecTaskInfo* pTaskInfo = tinfo;
// pTaskInfo->code = code;
//}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
......@@ -652,23 +647,33 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
*pRes = NULL;
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
// todo extract method
taosRLockLatch(&pTaskInfo->lock);
bool isKilled = isTaskKilled(pTaskInfo);
if (isKilled) {
clearStreamBlock(pTaskInfo->pRoot);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
if (pTaskInfo->owner != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
taosRUnLockLatch(&pTaskInfo->lock);
return pTaskInfo->code;
}
pTaskInfo->owner = threadId;
taosRUnLockLatch(&pTaskInfo->lock);
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampUs();
}
if (isTaskKilled(pTaskInfo)) {
clearStreamBlock(pTaskInfo->pRoot);
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS;
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -772,11 +777,13 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
taosWLockLatch(&pTaskInfo->lock);
while (qTaskIsExecuting(pTaskInfo)) {
taosMsleep(10);
}
pTaskInfo->code = rspCode;
taosWUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
......
......@@ -162,7 +162,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code;
}
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
......@@ -174,7 +174,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return 0;
break;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
......@@ -190,23 +190,18 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
return 0;
}
if (streamTaskShouldPause(&pTask->status)) {
break;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
continue;
}
if (output == NULL && qStreamRecoverScanFinished(exec)) {
finished = true;
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) {
finished = qStreamRecoverScanFinished(exec);
break;
} else {
if (output == NULL) {
ASSERT(0);
}
}
SSDataBlock block = {0};
......@@ -214,9 +209,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
numOfBlocks++;
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz);
if (numOfBlocks >= batchSz) {
if ((++numOfBlocks) >= batchSize) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize);
break;
}
}
......@@ -310,11 +304,12 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// todo. the dropping status should be append to the status after the halt completed.
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2. For a agg task
// for the step 2.
int8_t status = pStreamTask->status.taskStatus;
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(status == TASK_STATUS__HALT);
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
} else {
ASSERT(status == TASK_STATUS__SCAN_HISTORY);
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册