diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index cd21de52edffad82caefa4523c0a4cad415369c8..3da12d97f0a0359569183639d7dca55672918a76 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1479,10 +1479,12 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s } beigin++; } + if (dataOffset > 0) { memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen); - memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t)); } + + memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t)); return dataLen; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cd50a13eddafae18fd4a407e69ef4b668b39fc28..2ffb1be2603fcf12aca305a95f8661310e50b5ad 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -247,6 +247,16 @@ typedef struct SLoadRemoteDataInfo { uint64_t totalElapsed; // total elapsed time } SLoadRemoteDataInfo; +typedef struct SLimitInfo { + SLimit limit; + SLimit slimit; + uint64_t currentGroupId; + int64_t remainGroupOffset; + int64_t numOfOutputGroups; + int64_t remainOffset; + int64_t numOfOutputRows; +} SLimitInfo; + typedef struct SExchangeInfo { SArray* pSources; SArray* pSourceDataInfo; @@ -257,6 +267,7 @@ typedef struct SExchangeInfo { int32_t current; SLoadRemoteDataInfo loadInfo; uint64_t self; + SLimitInfo limitInfo; } SExchangeInfo; typedef struct SColMatchInfo { @@ -542,15 +553,7 @@ typedef struct SProjectOperatorInfo { SNode* pFilterNode; // filter info, which is push down by optimizer SSDataBlock* existDataBlock; SArray* pPseudoColInfo; - SLimit limit; - SLimit slimit; - - uint64_t groupId; - int64_t curSOffset; - int64_t curGroupOutput; - - int64_t curOffset; - int64_t curOutput; + SLimitInfo limitInfo; } SProjectOperatorInfo; typedef struct SIndefOperatorInfo { @@ -791,6 +794,9 @@ int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf const char* pkey); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); +int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); +bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); +void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); @@ -837,7 +843,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 061ac239050a06cc7021475d53fb66692bd770b0..17484ec84b14b836ca1942afe8f9aca13872d23b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -898,4 +898,23 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI } return w; +} + +bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { + return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 || + pLimitInfo->slimit.offset != -1); +} + + +static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } +static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } + +void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { + SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; + SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; + + pLimitInfo->limit = limit; + pLimitInfo->slimit= slimit; + pLimitInfo->remainOffset = limit.offset; + pLimitInfo->remainGroupOffset = slimit.offset; } \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b8612960e4bf06279e8a73406de229502db754ee..933beb9479dfc06ced090434ac70df128924d138 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -43,6 +43,11 @@ #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) +enum { + PROJECT_RETRIEVE_CONTINUE = 0x1, + PROJECT_RETRIEVE_DONE = 0x2, +}; + #if 0 static UNUSED_FUNC void *u_malloc (size_t __size) { uint32_t v = taosRand(); @@ -2340,7 +2345,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { +static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2366,6 +2371,44 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { } } +static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { + SExchangeInfo* pExchangeInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + while(1) { + SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator); + if (pBlock == NULL) { + return NULL; + } + + ASSERT(pBlock == pExchangeInfo->pResult); + + SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo; + if (hasLimitOffsetInfo(pLimitInfo)) { + int32_t status = handleLimitOffset(pOperator, pLimitInfo, pExchangeInfo->pResult, false); + if (status == PROJECT_RETRIEVE_CONTINUE) { + continue; + } else if (status == PROJECT_RETRIEVE_DONE) { + size_t rows = pExchangeInfo->pResult->info.rows; + pExchangeInfo->limitInfo.numOfOutputRows += rows; + + if (rows == 0) { + doSetOperatorCompleted(pOperator); + return NULL; + } else { + return pExchangeInfo->pResult; + } + } + } else { + return pExchangeInfo->pResult; + } + } +} + static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) { pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL) { @@ -2405,6 +2448,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* taosArrayPush(pInfo->pSources, pNode); } + initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); pInfo->self = taosAddRef(exchangeObjRefPool, pInfo); return initDataSource(numOfSources, pInfo, id); @@ -3148,68 +3192,60 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { return TDB_CODE_SUCCESS; } -enum { - PROJECT_RETRIEVE_CONTINUE = 0x1, - PROJECT_RETRIEVE_DONE = 0x2, -}; - -static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) { - SProjectOperatorInfo* pProjectInfo = pOperator->info; - SOptrBasicInfo* pInfo = &pProjectInfo->binfo; - SSDataBlock* pRes = pInfo->pRes; - - if (pProjectInfo->curSOffset > 0) { - if (pProjectInfo->groupId == 0) { // it is the first group - pProjectInfo->groupId = pBlock->info.groupId; - blockDataCleanup(pInfo->pRes); +int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { + if (pLimitInfo->remainGroupOffset > 0) { + if (pLimitInfo->currentGroupId == 0) { // it is the first group + pLimitInfo->currentGroupId = pBlock->info.groupId; + blockDataCleanup(pBlock); return PROJECT_RETRIEVE_CONTINUE; - } else if (pProjectInfo->groupId != pBlock->info.groupId) { - pProjectInfo->curSOffset -= 1; + } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { + // now it is the data from a new group + pLimitInfo->remainGroupOffset -= 1; // ignore data block in current group - if (pProjectInfo->curSOffset > 0) { - blockDataCleanup(pInfo->pRes); + if (pLimitInfo->remainGroupOffset > 0) { + blockDataCleanup(pBlock); return PROJECT_RETRIEVE_CONTINUE; } } // set current group id of the project operator - pProjectInfo->groupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.groupId; } - if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) { - pProjectInfo->curGroupOutput += 1; - if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) { + if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { + pLimitInfo->numOfOutputGroups += 1; + if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { pOperator->status = OP_EXEC_DONE; - blockDataCleanup(pRes); + blockDataCleanup(pBlock); return PROJECT_RETRIEVE_DONE; } // reset the value for a new group data - pProjectInfo->curOffset = 0; - pProjectInfo->curOutput = 0; + pLimitInfo->numOfOutputRows = 0; + pLimitInfo->remainOffset = pLimitInfo->limit.offset; } // here we reach the start position, according to the limit/offset requirements. // set current group id - pProjectInfo->groupId = pBlock->info.groupId; + pLimitInfo->currentGroupId = pBlock->info.groupId; - if (pProjectInfo->curOffset >= pRes->info.rows) { - pProjectInfo->curOffset -= pRes->info.rows; - blockDataCleanup(pRes); + if (pLimitInfo->remainOffset >= pBlock->info.rows) { + pLimitInfo->remainOffset -= pBlock->info.rows; + blockDataCleanup(pBlock); return PROJECT_RETRIEVE_CONTINUE; - } else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) { - blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset); - pProjectInfo->curOffset = 0; + } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { + blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); + pLimitInfo->remainOffset = 0; } // check for the limitation in each group - if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { - int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); - blockDataKeepFirstNRows(pRes, keepRows); - if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) { + if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { + int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); + blockDataKeepFirstNRows(pBlock, keepRows); + if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { pOperator->status = OP_EXEC_DONE; } @@ -3219,8 +3255,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) // todo optimize performance // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // they may not belong to the same group the limit/offset value is not valid in this case. - if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 || - pProjectInfo->slimit.limit != -1) { + if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 || + pLimitInfo->slimit.limit != -1) { return PROJECT_RETRIEVE_DONE; } else { // not full enough, continue to accumulate the output data in the buffer. return PROJECT_RETRIEVE_CONTINUE; @@ -3306,7 +3342,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - int32_t status = handleLimitOffset(pOperator, pBlock); + int32_t status = handleLimitOffset(pOperator, &pProjectInfo->limitInfo, pInfo->pRes, true); // filter shall be applied after apply functions and limit/offset on the result doFilter(pProjectInfo->pFilterNode, pInfo->pRes); @@ -3318,9 +3354,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } } - pProjectInfo->curOutput += pInfo->pRes->info.rows; - size_t rows = pInfo->pRes->info.rows; + pProjectInfo->limitInfo.numOfOutputRows += rows; + pOperator->resultInfo.totalRows += rows; if (pOperator->cost.openCost == 0) { @@ -3764,10 +3800,6 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) return pList; } -static int64_t getLimit(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } - -static int64_t getOffset(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } - SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); @@ -3780,13 +3812,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc); - SLimit limit = {.limit = getLimit(pProjPhyNode->node.pLimit), .offset = getOffset(pProjPhyNode->node.pLimit)}; - SLimit slimit = {.limit = getLimit(pProjPhyNode->node.pSlimit), .offset = getOffset(pProjPhyNode->node.pSlimit)}; + initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); - pInfo->limit = limit; - pInfo->slimit = slimit; - pInfo->curOffset = limit.offset; - pInfo->curSOffset = slimit.offset; pInfo->binfo.pRes = pResBlock; pInfo->pFilterNode = pProjPhyNode->node.pConditions; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 229d7dd67096b16b50d869d4a304d31f2a2aea68..a3b79d959790c68ce8865496d5998d1e518d0b53 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -22,31 +22,31 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, - SExecTaskInfo* pTaskInfo) { +// todo add limit/offset impl +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { goto _error; } - SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; + SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; SSDataBlock* pResBlock = createResDataBlock(pDescNode); - SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; SArray* pColMatchColInfo = - extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pInfo->binfo.pRes = pResBlock; initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); - pInfo->pCondition = pSortPhyNode->node.pConditions; + pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + pInfo->pCondition = pSortNode->node.pConditions; pInfo->pColMatchInfo = pColMatchColInfo; pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;