提交 d8fb5914 编写于 作者: 5 54liuyao

fix:incorrectly judged that the stream was killed

上级 2a272fb6
...@@ -881,6 +881,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); ...@@ -881,6 +881,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset); void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -93,6 +93,17 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) { ...@@ -93,6 +93,17 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
return 0; return 0;
} }
static void clearStreamBlock(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 1) {
return clearStreamBlock(pOperator->pDownstream[0]);
}
} else {
SStreamScanInfo* pInfo = pOperator->info;
doClearBufferedBlocks(pInfo);
}
}
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -607,7 +618,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { ...@@ -607,7 +618,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
pTaskInfo->cost.start = taosGetTimestampUs(); pTaskInfo->cost.start = taosGetTimestampUs();
} }
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) {
clearStreamBlock(pTaskInfo->pRoot);
atomic_store_64(&pTaskInfo->owner, 0); atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -945,7 +945,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* ...@@ -945,7 +945,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
return pOperator; return pOperator;
} }
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册