diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6346e743081a6594fcc9e8d8001ae18e3f90ac92..d2d61f140c5d52cbd5a055cb3950e7e8f2523575 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5604,6 +5604,18 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } +// check all SQLFunctionCtx is completed +static bool allCtxCompleted(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx) { + // only one false, return false + for(int32_t i = 0; i < pOperator->numOfOutput; i++) { + if(pCtx[i].resultInfo == NULL) + return false; + if(!pCtx[i].resultInfo->complete) + return false; + } + return true; +} + // this is a blocking operator static SSDataBlock* doAggregate(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; @@ -5642,6 +5654,9 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); + // if all pCtx is completed, then query should be over + if(allCtxCompleted(pOperator, pInfo->pCtx)) + break; } doSetOperatorCompleted(pOperator);