提交 450c5b44 编写于 作者: H Haojun Liao

fix(tmq): quit the scan asap.

上级 71d26202
...@@ -755,8 +755,8 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { ...@@ -755,8 +755,8 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, rspCode); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
while(qTaskIsExecuting(pTaskInfo)) { while(qTaskIsExecuting(pTaskInfo)) {
taosMsleep(10); taosMsleep(10);
......
...@@ -772,7 +772,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -772,7 +772,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
while (1) { while (1) {
SSDataBlock* result = doGroupedTableScan(pOperator); SSDataBlock* result = doGroupedTableScan(pOperator);
if (result || (pOperator->status == OP_EXEC_DONE)) { if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
return result; return result;
} }
...@@ -1666,6 +1666,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1666,6 +1666,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
SFetchRet ret = {0}; SFetchRet ret = {0};
terrno = 0;
if (tqNextBlock(pInfo->tqReader, &ret) < 0) { if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
// if the end is reached, terrno is 0 // if the end is reached, terrno is 0
if (terrno != 0) { if (terrno != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册