提交 fae4f2c4 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 c9a1b3ba
...@@ -704,9 +704,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG ...@@ -704,9 +704,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
SDiskbasedBuf* pBuf); SDiskbasedBuf* pBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
......
...@@ -707,6 +707,8 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -707,6 +707,8 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
} }
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->remainGroupOffset > 0) {
if (pLimitInfo->currentGroupId == 0) { // it is the first group if (pLimitInfo->currentGroupId == 0) { // it is the first group
pLimitInfo->currentGroupId = pBlock->info.id.groupId; pLimitInfo->currentGroupId = pBlock->info.id.groupId;
...@@ -750,36 +752,20 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa ...@@ -750,36 +752,20 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
// set current group id // set current group id
pLimitInfo->currentGroupId = pBlock->info.id.groupId; pLimitInfo->currentGroupId = pBlock->info.id.groupId;
if (pLimitInfo->remainOffset >= pBlock->info.rows) { bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
pLimitInfo->remainOffset -= pBlock->info.rows; if (pBlock->info.rows == 0) {
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { } else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
pLimitInfo->remainOffset = 0;
}
// check for the limitation in each group
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} else { return PROJECT_RETRIEVE_DONE;
// current group limitation is reached, and future blocks of this group need to be discarded.
if (pBlock->info.rows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
} }
return PROJECT_RETRIEVE_DONE;
} }
// todo optimize performance // todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case. // they may not belong to the same group the limit/offset value is not valid in this case.
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 || if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
pLimitInfo->slimit.limit != -1) {
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer. } else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
......
...@@ -1749,6 +1749,10 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { ...@@ -1749,6 +1749,10 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
pLimitInfo->slimit.offset != -1); pLimitInfo->slimit.offset != -1);
} }
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) {
return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1);
}
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
......
...@@ -185,36 +185,15 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS ...@@ -185,36 +185,15 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
SOperatorInfo* pOperator) { SOperatorInfo* pOperator) {
// set current group id // set current group id
pLimitInfo->currentGroupId = groupId; pLimitInfo->currentGroupId = groupId;
bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo);
if (pLimitInfo->remainOffset >= pBlock->info.rows) { if (pBlock->info.rows == 0) {
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE; return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { } else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
pLimitInfo->remainOffset = 0;
}
// check for the limitation in each group
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
// TODO: optimize it later when partition by + limit
// all retrieved requirement has been fulfilled, let's finish this
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} else {
// Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data
// from next group. So let's continue this retrieve process
if (keepRows == 0) {
return PROJECT_RETRIEVE_CONTINUE;
}
} }
} }
pLimitInfo->numOfOutputRows += pBlock->info.rows;
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} }
......
...@@ -256,12 +256,11 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo ...@@ -256,12 +256,11 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
} }
} }
// todo handle the slimit info bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
SLimit* pLimit = &pLimitInfo->limit; SLimit* pLimit = &pLimitInfo->limit;
const char* id = GET_TASKID(pTaskInfo); const char* id = GET_TASKID(pTaskInfo);
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { if (pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset >= pBlock->info.rows) { if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows; pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataEmpty(pBlock); blockDataEmpty(pBlock);
...@@ -276,12 +275,14 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo ...@@ -276,12 +275,14 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) { if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
// limit the output rows // limit the output rows
int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows); int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keep); blockDataKeepFirstNRows(pBlock, keep);
pLimitInfo->numOfOutputRows += pBlock->info.rows;
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id); qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
return true; return true;
} }
pLimitInfo->numOfOutputRows += pBlock->info.rows;
return false; return false;
} }
...@@ -393,13 +394,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca ...@@ -393,13 +394,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
} }
} }
bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
if (limitReached) { // set operator flag is done if (limitReached) { // set operator flag is done
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2714,9 +2714,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* ...@@ -2714,9 +2714,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
} }
} }
applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator); applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;
qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows); pInfo->limitInfo.numOfOutputRows);
......
...@@ -222,6 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -222,6 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
// multi-group case not handle here
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (1) { while (1) {
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
...@@ -236,28 +237,13 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -236,28 +237,13 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
continue; continue;
} }
// todo add the limit/offset info bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
if (pInfo->limitInfo.remainOffset > 0) { if (limitReached) {
if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { resetLimitInfoForNextGroup(&pInfo->limitInfo);
pInfo->limitInfo.remainOffset -= pBlock->info.rows;
continue;
}
blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset);
pInfo->limitInfo.remainOffset = 0;
}
if (pInfo->limitInfo.limit.limit > 0 &&
pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) {
int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows;
blockDataKeepFirstNRows(pBlock, remain);
} }
size_t numOfRows = blockDataGetNumOfRows(pBlock); pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->limitInfo.numOfOutputRows += numOfRows; if (pBlock->info.rows > 0) {
pOperator->resultInfo.totalRows += numOfRows;
if (numOfRows > 0) {
break; break;
} }
} }
...@@ -680,7 +666,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -680,7 +666,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
break; break;
} }
bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator); bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
if (limitReached) { if (limitReached) {
resetLimitInfoForNextGroup(&pInfo->limitInfo); resetLimitInfoForNextGroup(&pInfo->limitInfo);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册