diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index fca59c9033024d771a377235cdc05369901d81db..4e9805acddc55cf07bd1a88a94a8d867a07b75f4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6359,6 +6359,19 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } +static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryRuntimeEnv* pRuntimeEnv, bool* newgroup) { + pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; + int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; + taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); + + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); + + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); + pInfo->existNewGroupBlock = NULL; + *newgroup = true; +} + static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; @@ -6370,16 +6383,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRunt // handle the cached new group data block if (pInfo->existNewGroupBlock) { - pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; - taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); - - taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); - taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); - pInfo->existNewGroupBlock = NULL; - *newgroup = true; + doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); } } @@ -6398,26 +6402,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { return pInfo->pRes; } -// if (taosFillHasMoreResults(pInfo->pFillInfo)) { -// *newgroup = false; -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); -// return pInfo->pRes; -// } -// -// // handle the cached new group data block -// if (pInfo->existNewGroupBlock) { -// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; -// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; -// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); -// -// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); -// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); -// -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); -// pInfo->existNewGroupBlock = NULL; -// *newgroup = true; -// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; -// } while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -6465,45 +6449,13 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { return pInfo->pRes; } -// if (taosFillHasMoreResults(pInfo->pFillInfo)) { -// *newgroup = false; -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); -// return pInfo->pRes; -// } -// -// // handle the cached new group data block -// if (pInfo->existNewGroupBlock) { -// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; -// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; -// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); -// -// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); -// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); -// -// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); -// pInfo->existNewGroupBlock = NULL; -// *newgroup = true; -// -// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { -// return pInfo->pRes; -// } -// -//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; -// } - } else if (pInfo->existNewGroupBlock) { // try next group - pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; - taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); - - taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); - taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); - pInfo->existNewGroupBlock = NULL; - *newgroup = true; + assert(pBlock != NULL); + doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); - return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + return pInfo->pRes; + } } else { return NULL; }