diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b66385add47dd093311aa3376f887d60cf070346..c91b835980429176483c23850e9887fe7aa48d5a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -283,13 +283,12 @@ enum OPERATOR_TYPE_E { OP_Arithmetic = 7, OP_Groupby = 8, OP_Limit = 9, - OP_Offset = 10, - OP_TimeWindow = 11, - OP_SessionWindow = 12, - OP_Fill = 13, - OP_MultiTableAggregate = 14, - OP_MultiTableTimeInterval = 15, - OP_DummyInput = 16, //TODO remove it after fully refactor. + OP_TimeWindow = 10, + OP_SessionWindow = 11, + OP_Fill = 12, + OP_MultiTableAggregate = 13, + OP_MultiTableTimeInterval = 14, + OP_DummyInput = 15, //TODO remove it after fully refactor. }; typedef struct SOperatorInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d80912db484843203faf83a0b4556edb4be50c51..0c5686b21612457cdd0f2000a5806bbaab2b69cb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -163,7 +163,6 @@ static SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryR static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -1739,11 +1738,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } - case OP_Offset: { - pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); - break; - } - case OP_Fill: { SOperatorInfo* pInfo = pRuntimeEnv->proot; pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); @@ -4535,53 +4529,25 @@ static SSDataBlock* doArithmeticOperation(void* param) { } static SSDataBlock* doLimit(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; + SOperatorInfo* pOperator = (SOperatorInfo*)param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } SLimitOperatorInfo* pInfo = pOperator->info; - - SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } - - if (pInfo->total + pBlock->info.rows >= pInfo->limit) { - pBlock->info.rows = (int32_t) (pInfo->limit - pInfo->total); - - pInfo->total = pInfo->limit; - - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - } else { - pInfo->total += pBlock->info.rows; - } - - return pBlock; -} - -// TODO add log -static SSDataBlock* doOffset(void* param) { - SOperatorInfo *pOperator = (SOperatorInfo *)param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + SSDataBlock* pBlock = NULL; while (1) { - SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); + pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; return NULL; } if (pRuntimeEnv->currentOffset == 0) { - return pBlock; + break; } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { pRuntimeEnv->currentOffset -= pBlock->info.rows; } else { @@ -4589,16 +4555,28 @@ static SSDataBlock* doOffset(void* param) { pBlock->info.rows = remain; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); int16_t bytes = pColInfoData->info.bytes; memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); } pRuntimeEnv->currentOffset = 0; - return pBlock; + break; } } + + if (pInfo->total + pBlock->info.rows >= pInfo->limit) { + pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total); + pInfo->total = pInfo->limit; + + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + } else { + pInfo->total += pBlock->info.rows; + } + + return pBlock; } static SSDataBlock* doIntervalAgg(void* param) { @@ -5025,24 +5003,6 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } -SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { - SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo)); - - pInfo->offset = pRuntimeEnv->pQueryAttr->limit.offset; - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - - pOperator->name = "OffsetOperator"; - pOperator->operatorType = OP_Offset; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->upstream = upstream; - pOperator->exec = doOffset; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - - return pOperator; -} - SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 7429004289de202770241764d7d5f5f12b671c0f..8ddb2bbd612ed1c68332741264e7ade4a601c7c0 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -116,12 +116,8 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } - if (pQueryAttr->limit.offset > 0) { - op = OP_Offset; - taosArrayPush(plan, &op); - } - if (pQueryAttr->limit.limit > 0) { + if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { op = OP_Limit; taosArrayPush(plan, &op); }