提交 f6ae9cf9 编写于 作者: H Haojun Liao

fix(stream): fetch all data before paused and dump to sink node.

上级 b8386037
......@@ -590,7 +590,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
......
......@@ -647,23 +647,33 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
*pRes = NULL;
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
// todo extract method
taosRLockLatch(&pTaskInfo->lock);
bool isKilled = isTaskKilled(pTaskInfo);
if (isKilled) {
clearStreamBlock(pTaskInfo->pRoot);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
if (pTaskInfo->owner != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
taosRUnLockLatch(&pTaskInfo->lock);
return pTaskInfo->code;
}
pTaskInfo->owner = threadId;
taosRUnLockLatch(&pTaskInfo->lock);
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampUs();
}
if (isTaskKilled(pTaskInfo)) {
clearStreamBlock(pTaskInfo->pRoot);
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS;
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -767,11 +777,13 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
taosWLockLatch(&pTaskInfo->lock);
while (qTaskIsExecuting(pTaskInfo)) {
taosMsleep(10);
}
pTaskInfo->code = rspCode;
taosWUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
......
......@@ -162,7 +162,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code;
}
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
......@@ -174,7 +174,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return 0;
break;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
......@@ -190,10 +190,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
return 0;
}
if (streamTaskShouldPause(&pTask->status)) {
break;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
......@@ -203,13 +199,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
}
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL && qStreamRecoverScanFinished(exec)) {
finished = true;
if (output == NULL) {
finished = qStreamRecoverScanFinished(exec);
break;
} else {
if (output == NULL) {
ASSERT(0);
}
}
SSDataBlock block = {0};
......@@ -218,8 +210,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
taosArrayPush(pRes, &block);
numOfBlocks++;
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz);
if (numOfBlocks >= batchSz) {
if (numOfBlocks >= batchSize || code != TSDB_CODE_SUCCESS) {
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d, code:%s", pTask->id.idStr, numOfBlocks, batchSize,
tstrerror(code));
break;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册