提交 9adc1ed8 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 0799a5ea
...@@ -73,7 +73,7 @@ typedef struct SResKeyPos { ...@@ -73,7 +73,7 @@ typedef struct SResKeyPos {
} SResKeyPos; } SResKeyPos;
typedef struct SResultRowInfo { typedef struct SResultRowInfo {
SResultRowPosition *pPosition; SResultRowPosition *pPosition; // todo remove this
int32_t size; // number of result set int32_t size; // number of result set
int32_t capacity; // max capacity int32_t capacity; // max capacity
SResultRowPosition cur; SResultRowPosition cur;
......
...@@ -425,7 +425,7 @@ typedef struct STimeWindowSupp { ...@@ -425,7 +425,7 @@ typedef struct STimeWindowSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp; } STimeWindowAggSupp;
typedef struct STableIntervalOperatorInfo { typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info SInterval interval; // interval info
...@@ -440,7 +440,7 @@ typedef struct STableIntervalOperatorInfo { ...@@ -440,7 +440,7 @@ typedef struct STableIntervalOperatorInfo {
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info struct SFillInfo* pFillInfo; // fill info
} STableIntervalOperatorInfo; } SIntervalAggOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
...@@ -533,6 +533,7 @@ typedef struct SSessionAggOperatorInfo { ...@@ -533,6 +533,7 @@ typedef struct SSessionAggOperatorInfo {
SWindowRowsSup winSup; SWindowRowsSup winSup;
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
...@@ -619,8 +620,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -619,8 +620,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
SDiskbasedBuf* pBuf, int32_t* rowCellOffset, SqlFunctionCtx* pCtx);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
...@@ -673,7 +674,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -673,7 +674,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
......
...@@ -2246,10 +2246,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased ...@@ -2246,10 +2246,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
return 0; return 0;
} }
void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf) {
SDiskbasedBuf* pBuf, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
int32_t* rowCellOffset = pbInfo->rowCellInfoOffset;
SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pbInfo->pCtx;
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
return; return;
...@@ -3881,8 +3884,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) ...@@ -3881,8 +3884,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
} }
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pInfo->pRes, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, doBuildResultDatablock(pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf);
pInfo->rowCellInfoOffset, pInfo->pCtx);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -4976,9 +4978,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4976,9 +4978,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark, STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType}; .calTrigger = pIntervalPhyNode->window.triggerType};
int32_t primaryTsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTableGroupInfo,
pTableGroupInfo, pTaskInfo); pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
...@@ -4992,9 +4995,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4992,9 +4995,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType}; .calTrigger = pSessionNode->window.triggerType};
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, &as, pTaskInfo); pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
...@@ -5021,7 +5026,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -5021,7 +5026,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num);
SInterval* pInterval = &((STableIntervalOperatorInfo*)ops[0]->info)->interval; SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval;
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo); pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
......
...@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) { while(1) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes); doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
......
...@@ -456,7 +456,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun ...@@ -456,7 +456,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
} }
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, STableIntervalOperatorInfo* pInfo) { TSKEY* primaryKeys, int32_t prevPosition, SIntervalAggOperatorInfo* pInfo) {
int32_t order = pInfo->order; int32_t order = pInfo->order;
bool ascQuery = (order == TSDB_ORDER_ASC); bool ascQuery = (order == TSDB_ORDER_ASC);
...@@ -609,7 +609,7 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd ...@@ -609,7 +609,7 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
int32_t tableGroupId) { int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info; SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfOutput; int32_t numOfOutput = pOperatorInfo->numOfOutput;
...@@ -759,7 +759,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -759,7 +759,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableIntervalOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
// STimeWindow win = {0}; // STimeWindow win = {0};
...@@ -893,8 +893,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -893,8 +893,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -927,8 +926,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -927,8 +926,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -937,7 +935,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -937,7 +935,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
} }
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgroup) { static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgroup) {
STableIntervalOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -955,8 +953,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro ...@@ -955,8 +953,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
} }
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBlock, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
...@@ -967,7 +964,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro ...@@ -967,7 +964,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
} }
// todo merged with the build group result. // todo merged with the build group result.
static void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList, static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList,
int32_t* rowCellInfoOffset) { int32_t* rowCellInfoOffset) {
size_t num = taosArrayGetSize(pUpdateList); size_t num = taosArrayGetSize(pUpdateList);
...@@ -976,22 +973,11 @@ static void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDi ...@@ -976,22 +973,11 @@ static void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDi
SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId); SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset);
//
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); SResultRowEntryInfo* pEntry = getResultCell(pRow, j, rowCellInfoOffset);
// if (pRow->numOfRows < pEntry->numOfRes) {
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; pRow->numOfRows = pEntry->numOfRes;
// if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
// continue;
// }
//
// if (pCtx[j].fpSet.process) { // TODO set the dummy function.
//// pCtx[j].fpSet.finalize(&pCtx[j]);
// pResInfo->initialized = true;
// }
//
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
} }
} }
...@@ -1000,7 +986,7 @@ static void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDi ...@@ -1000,7 +986,7 @@ static void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDi
} }
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
STableIntervalOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -1008,8 +994,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup ...@@ -1008,8 +994,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
} }
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -1040,13 +1025,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup ...@@ -1040,13 +1025,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
} }
finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
ASSERT(pInfo->binfo.pRes->info.rows > 0); ASSERT(pInfo->binfo.pRes->info.rows > 0);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
...@@ -1061,7 +1044,7 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1061,7 +1044,7 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
} }
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)param; SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
} }
...@@ -1070,26 +1053,26 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1070,26 +1053,26 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
// pInfo->execModel = OPTR_EXEC_MODEL_STREAM; // pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId; pInfo->primaryTsIndex = primaryTsSlotId;
int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(pOperator, numOfRows);
int32_t code = int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
...@@ -1099,14 +1082,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1099,14 +1082,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
...@@ -1130,7 +1113,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1130,7 +1113,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
...@@ -1189,8 +1172,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1189,8 +1172,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// todo find the correct time stamp column slot SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
bool masterScan = true; bool masterScan = true;
int32_t numOfOutput = pOperator->numOfOutput; int32_t numOfOutput = pOperator->numOfOutput;
...@@ -1266,8 +1248,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) ...@@ -1266,8 +1248,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -1300,8 +1281,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) ...@@ -1300,8 +1281,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -1435,7 +1415,7 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1435,7 +1415,7 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
} }
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp* pTwAggSupp, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -1443,10 +1423,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -1443,10 +1423,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
goto _error; goto _error;
} }
int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(pOperator, numOfRows);
int32_t code = int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1457,17 +1436,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -1457,17 +1436,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->gap = gap; pInfo->tsSlotId = tsSlotId;
pInfo->binfo.pRes = pResBlock; pInfo->gap = gap;
pInfo->winSup.prevTs = INT64_MIN; pInfo->binfo.pRes = pResBlock;
pInfo->reptScan = false; pInfo->winSup.prevTs = INT64_MIN;
pOperator->name = "SessionWindowAggOperator"; pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册