提交 f0d1f393 编写于 作者: H Haojun Liao

fix(query): add limit/offset implementation in exchange operator.

上级 6756c651
......@@ -1479,10 +1479,12 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s
}
beigin++;
}
if (dataOffset > 0) {
memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
}
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
return dataLen;
}
......
......@@ -247,6 +247,16 @@ typedef struct SLoadRemoteDataInfo {
uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo;
typedef struct SLimitInfo {
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
......@@ -257,6 +267,7 @@ typedef struct SExchangeInfo {
int32_t current;
SLoadRemoteDataInfo loadInfo;
uint64_t self;
SLimitInfo limitInfo;
} SExchangeInfo;
typedef struct SColMatchInfo {
......@@ -542,15 +553,7 @@ typedef struct SProjectOperatorInfo {
SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* existDataBlock;
SArray* pPseudoColInfo;
SLimit limit;
SLimit slimit;
uint64_t groupId;
int64_t curSOffset;
int64_t curGroupOutput;
int64_t curOffset;
int64_t curOutput;
SLimitInfo limitInfo;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
......@@ -791,6 +794,9 @@ int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
......@@ -837,7 +843,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
......
......@@ -898,4 +898,23 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
}
return w;
}
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
return (pLimitInfo->limit.limit != -1 || pLimitInfo->limit.offset != -1 || pLimitInfo->slimit.limit != -1 ||
pLimitInfo->slimit.offset != -1);
}
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
pLimitInfo->limit = limit;
pLimitInfo->slimit= slimit;
pLimitInfo->remainOffset = limit.offset;
pLimitInfo->remainGroupOffset = slimit.offset;
}
\ No newline at end of file
......@@ -43,6 +43,11 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand();
......@@ -2340,7 +2345,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -2366,6 +2371,44 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
}
}
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
while(1) {
SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
if (pBlock == NULL) {
return NULL;
}
ASSERT(pBlock == pExchangeInfo->pResult);
SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
if (hasLimitOffsetInfo(pLimitInfo)) {
int32_t status = handleLimitOffset(pOperator, pLimitInfo, pExchangeInfo->pResult, false);
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;
} else if (status == PROJECT_RETRIEVE_DONE) {
size_t rows = pExchangeInfo->pResult->info.rows;
pExchangeInfo->limitInfo.numOfOutputRows += rows;
if (rows == 0) {
doSetOperatorCompleted(pOperator);
return NULL;
} else {
return pExchangeInfo->pResult;
}
}
} else {
return pExchangeInfo->pResult;
}
}
}
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL) {
......@@ -2405,6 +2448,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
taosArrayPush(pInfo->pSources, pNode);
}
initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);
return initDataSource(numOfSources, pInfo, id);
......@@ -3148,68 +3192,60 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
return TDB_CODE_SUCCESS;
}
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes;
if (pProjectInfo->curSOffset > 0) {
if (pProjectInfo->groupId == 0) { // it is the first group
pProjectInfo->groupId = pBlock->info.groupId;
blockDataCleanup(pInfo->pRes);
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
if (pLimitInfo->remainGroupOffset > 0) {
if (pLimitInfo->currentGroupId == 0) { // it is the first group
pLimitInfo->currentGroupId = pBlock->info.groupId;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curSOffset -= 1;
} else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
// now it is the data from a new group
pLimitInfo->remainGroupOffset -= 1;
// ignore data block in current group
if (pProjectInfo->curSOffset > 0) {
blockDataCleanup(pInfo->pRes);
if (pLimitInfo->remainGroupOffset > 0) {
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
}
}
// set current group id of the project operator
pProjectInfo->groupId = pBlock->info.groupId;
pLimitInfo->currentGroupId = pBlock->info.groupId;
}
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curGroupOutput += 1;
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
pLimitInfo->numOfOutputGroups += 1;
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
pOperator->status = OP_EXEC_DONE;
blockDataCleanup(pRes);
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_DONE;
}
// reset the value for a new group data
pProjectInfo->curOffset = 0;
pProjectInfo->curOutput = 0;
pLimitInfo->numOfOutputRows = 0;
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pProjectInfo->groupId = pBlock->info.groupId;
pLimitInfo->currentGroupId = pBlock->info.groupId;
if (pProjectInfo->curOffset >= pRes->info.rows) {
pProjectInfo->curOffset -= pRes->info.rows;
blockDataCleanup(pRes);
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset);
pProjectInfo->curOffset = 0;
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
}
// check for the limitation in each group
if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
blockDataKeepFirstNRows(pRes, keepRows);
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -3219,8 +3255,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 ||
pProjectInfo->slimit.limit != -1) {
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
pLimitInfo->slimit.limit != -1) {
return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE;
......@@ -3306,7 +3342,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
int32_t status = handleLimitOffset(pOperator, pBlock);
int32_t status = handleLimitOffset(pOperator, &pProjectInfo->limitInfo, pInfo->pRes, true);
// filter shall be applied after apply functions and limit/offset on the result
doFilter(pProjectInfo->pFilterNode, pInfo->pRes);
......@@ -3318,9 +3354,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
}
pProjectInfo->curOutput += pInfo->pRes->info.rows;
size_t rows = pInfo->pRes->info.rows;
pProjectInfo->limitInfo.numOfOutputRows += rows;
pOperator->resultInfo.totalRows += rows;
if (pOperator->cost.openCost == 0) {
......@@ -3764,10 +3800,6 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
return pList;
}
static int64_t getLimit(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
......@@ -3780,13 +3812,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
SLimit limit = {.limit = getLimit(pProjPhyNode->node.pLimit), .offset = getOffset(pProjPhyNode->node.pLimit)};
SLimit slimit = {.limit = getLimit(pProjPhyNode->node.pSlimit), .offset = getOffset(pProjPhyNode->node.pSlimit)};
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->limit = limit;
pInfo->slimit = slimit;
pInfo->curOffset = limit.offset;
pInfo->curSOffset = slimit.offset;
pInfo->binfo.pRes = pResBlock;
pInfo->pFilterNode = pProjPhyNode->node.pConditions;
......
......@@ -22,31 +22,31 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo) {
// todo add limit/offset impl
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
goto _error;
}
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo =
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
pInfo->pCondition = pSortPhyNode->node.pConditions;
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pCondition = pSortNode->node.pConditions;
pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册