提交 13feb7da 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 9b40ec72
......@@ -625,6 +625,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
int32_t getTableScanOrder(SOperatorInfo* pOperator);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
......
......@@ -207,7 +207,6 @@ static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
......@@ -4677,7 +4676,18 @@ _error:
return NULL;
}
//static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; }
int32_t getTableScanOrder(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
if (pOperator->pDownstream[0] != NULL) {
return getTableScanOrder(pOperator->pDownstream[0]);
} else {
return TSDB_ORDER_ASC;
}
}
STableScanInfo* pTableScanInfo = pOperator->info;
return pTableScanInfo->cond.order;
}
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
......@@ -4881,6 +4891,76 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
return true;
}
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes;
if (pProjectInfo->curSOffset > 0) {
if (pProjectInfo->groupId == 0) { // it is the first group
pProjectInfo->groupId = pBlock->info.groupId;
blockDataCleanup(pInfo->pRes);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curSOffset -= 1;
// ignore data block in current group
if (pProjectInfo->curSOffset > 0) {
blockDataCleanup(pInfo->pRes);
return PROJECT_RETRIEVE_CONTINUE;
}
}
// set current group id of the project operator
pProjectInfo->groupId = pBlock->info.groupId;
}
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curGroupOutput += 1;
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
pOperator->status = OP_EXEC_DONE;
blockDataCleanup(pRes);
return PROJECT_RETRIEVE_DONE;
}
// reset the value for a new group data
pProjectInfo->curOffset = 0;
pProjectInfo->curOutput = 0;
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pProjectInfo->groupId = pBlock->info.groupId;
if (pProjectInfo->curOffset >= pRes->info.rows) {
pProjectInfo->curOffset -= pRes->info.rows;
blockDataCleanup(pRes);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset);
pProjectInfo->curOffset = 0;
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
// check for the limitation in each group
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
}
return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE;
}
}
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
......@@ -4949,63 +5029,22 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC, false);
int32_t order = getTableScanOrder(pOperator->pDownstream[0]);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput,
pProjectInfo->pPseudoColInfo);
if (pProjectInfo->curSOffset > 0) {
if (pProjectInfo->groupId == 0) { // it is the first group
pProjectInfo->groupId = pBlock->info.groupId;
blockDataCleanup(pInfo->pRes);
continue;
} else if (pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curSOffset -= 1;
// ignore data block in current group
if (pProjectInfo->curSOffset > 0) {
blockDataCleanup(pInfo->pRes);
continue;
}
}
pProjectInfo->groupId = pBlock->info.groupId;
}
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curGroupOutput += 1;
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
pOperator->status = OP_EXEC_DONE;
return NULL;
}
// reset the value for a new group data
pProjectInfo->curOffset = 0;
pProjectInfo->curOutput = 0;
}
pProjectInfo->groupId = pBlock->info.groupId;
// todo extract method
if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset);
pProjectInfo->curOffset = 0;
} else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) {
pProjectInfo->curOffset -= pInfo->pRes->info.rows;
blockDataCleanup(pInfo->pRes);
int32_t status = handleLimitOffset(pOperator, pBlock);
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
} else if (status == PROJECT_RETRIEVE_DONE) {
break;
}
}
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pInfo->pRes->info.rows >= pProjectInfo->limit.limit) {
pInfo->pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
}
pProjectInfo->curOutput += pInfo->pRes->info.rows;
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
......
......@@ -309,7 +309,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
// SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
*newgroup = false;
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册