提交 ac00e1d5 编写于 作者: H Haojun Liao

fix(stream): fetch all data before paused and dump to sink node.

上级 0e3fd527
...@@ -590,7 +590,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask); ...@@ -590,7 +590,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask); int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
......
...@@ -647,23 +647,33 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { ...@@ -647,23 +647,33 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
*pRes = NULL; *pRes = NULL;
int64_t curOwner = 0; int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
// todo extract method
taosRLockLatch(&pTaskInfo->lock);
bool isKilled = isTaskKilled(pTaskInfo);
if (isKilled) {
clearStreamBlock(pTaskInfo->pRoot);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
if (pTaskInfo->owner != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
taosRUnLockLatch(&pTaskInfo->lock);
return pTaskInfo->code; return pTaskInfo->code;
} }
pTaskInfo->owner = threadId;
taosRUnLockLatch(&pTaskInfo->lock);
if (pTaskInfo->cost.start == 0) { if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampUs(); pTaskInfo->cost.start = taosGetTimestampUs();
} }
if (isTaskKilled(pTaskInfo)) {
clearStreamBlock(pTaskInfo->pRoot);
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS;
}
// error occurs, record the error code and return to client // error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env); int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
...@@ -767,11 +777,13 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { ...@@ -767,11 +777,13 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo)); qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
taosWLockLatch(&pTaskInfo->lock);
while (qTaskIsExecuting(pTaskInfo)) { while (qTaskIsExecuting(pTaskInfo)) {
taosMsleep(10); taosMsleep(10);
} }
pTaskInfo->code = rspCode; pTaskInfo->code = rspCode;
taosWUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -163,7 +163,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -163,7 +163,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code; return code;
} }
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
...@@ -175,7 +175,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -175,7 +175,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return 0; break;
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
...@@ -191,10 +191,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -191,10 +191,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
return 0; return 0;
} }
if (streamTaskShouldPause(&pTask->status)) {
break;
}
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
code = qExecTask(exec, &output, &ts); code = qExecTask(exec, &output, &ts);
...@@ -204,13 +200,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -204,13 +200,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
} }
// the generated results before fill-history task been paused, should be dispatched to sink node // the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL && qStreamRecoverScanFinished(exec)) { if (output == NULL) {
finished = true; finished = qStreamRecoverScanFinished(exec);
break; break;
} else {
if (output == NULL) {
ASSERT(0);
}
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
...@@ -219,8 +211,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -219,8 +211,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
numOfBlocks++; numOfBlocks++;
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz); if (numOfBlocks >= batchSize || code != TSDB_CODE_SUCCESS) {
if (numOfBlocks >= batchSz) { qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d, code:%s", pTask->id.idStr, numOfBlocks, batchSize,
tstrerror(code));
break; break;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册