提交 b09ee742 编写于 作者: H Haojun Liao

refactor(query): check table in uid order when handling the last block, and some internal refactor.

上级 374968d0
...@@ -129,13 +129,19 @@ typedef struct SFileBlockDumpInfo { ...@@ -129,13 +129,19 @@ typedef struct SFileBlockDumpInfo {
bool allDumped; bool allDumped;
} SFileBlockDumpInfo; } SFileBlockDumpInfo;
typedef struct SUidOrderCheckInfo {
uint64_t* tableUidList; // access table uid list in uid ascending order list
int32_t currentIndex; // index in table uid list
} SUidOrderCheckInfo;
typedef struct SReaderStatus { typedef struct SReaderStatus {
bool loadFromFile; // check file stage bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not bool composedDataBlock; // the returned data block is a composed block or not
SHashObj* pTableMap; // SHash<STableBlockScanInfo> SHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
SFileBlockDumpInfo fBlockDumpInfo; SFileBlockDumpInfo fBlockDumpInfo;
SDFileSet* pCurrentFileset; // current opened file set SDFileSet* pCurrentFileset; // current opened file set
SBlockData fileBlockData; SBlockData fileBlockData;
SFilesetIter fileIter; SFilesetIter fileIter;
SDataBlockIter blockIter; SDataBlockIter blockIter;
...@@ -573,7 +579,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { ...@@ -573,7 +579,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
} }
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
px->indexInBlockL = -1; px->indexInBlockL = DEFAULT_ROW_INDEX_VAL;
tMapDataClear(&px->mapData); tMapDataClear(&px->mapData);
taosArrayClear(px->pBlockList); taosArrayClear(px->pBlockList);
} }
...@@ -1810,7 +1816,8 @@ static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) { ...@@ -1810,7 +1816,8 @@ static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
} }
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1; bool asc = ASCENDING_TRAVERSE(pLastBlockReader->order);
int32_t step = (asc) ? 1 : -1;
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
return false; return false;
} }
...@@ -1819,8 +1826,20 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockSc ...@@ -1819,8 +1826,20 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockSc
SBlockData* pBlockData = &pLastBlockReader->lastBlockData; SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) { for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL && pBlockData->aUid[i] != pLastBlockReader->uid) { if (pBlockData->aUid != NULL) {
continue; if (asc) {
if (pBlockData->aUid[i] < pLastBlockReader->uid) {
continue;
} else if (pBlockData->aUid[i] > pLastBlockReader->uid) {
break;
}
} else {
if (pBlockData->aUid[i] > pLastBlockReader->uid) {
continue;
} else if (pBlockData->aUid[i] < pLastBlockReader->uid) {
break;
}
}
} }
int64_t ts = pBlockData->aTSKEY[i]; int64_t ts = pBlockData->aTSKEY[i];
...@@ -2279,22 +2298,75 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable ...@@ -2279,22 +2298,75 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*) p1;
uint64_t pu2 = *(uint64_t*) p2;
if (pu1 == pu2) {
return 0;
} else {
return (pu1 > pu2)? -1:1;
}
}
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
if (pOrderCheckInfo->tableUidList == NULL) {
int32_t total = taosHashGetSize(pStatus->pTableMap);
pOrderCheckInfo->currentIndex = 0;
pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t));
if (pOrderCheckInfo->tableUidList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t index = 0;
void* p = taosHashIterate(pStatus->pTableMap, NULL);
while(p != NULL) {
STableBlockScanInfo* pScanInfo = p;
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
p = taosHashIterate(pStatus->pTableMap, p);
}
taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
uint64_t uid = pOrderCheckInfo->tableUidList[0];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
} else {
if (pStatus->pTableIter == NULL) {
uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
}
}
return TSDB_CODE_SUCCESS;
}
static bool moveToNextTable(SUidOrderCheckInfo *pOrderedCheckInfo, SReaderStatus* pStatus) {
pOrderedCheckInfo->currentIndex += 1;
if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
pStatus->pTableIter = NULL;
return false;
}
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
ASSERT(pStatus->pTableIter != NULL);
return true;
}
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
while(1) { SUidOrderCheckInfo *pOrderedCheckInfo = &pStatus->uidCheckInfo;
if (pStatus->pTableIter == NULL) { int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); if (code != TSDB_CODE_SUCCESS) {
if (pStatus->pTableIter == NULL) { return code;
return TSDB_CODE_SUCCESS; }
}
}
while(1) {
// load the last data block of current table // load the last data block of current table
// todo opt perf by avoiding load last block repeatly
STableBlockScanInfo* pScanInfo = pStatus->pTableIter; STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2302,19 +2374,20 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2302,19 +2374,20 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (pLastBlockReader->currentBlockIndex != -1) { if (pLastBlockReader->currentBlockIndex != -1) {
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
int32_t index = pScanInfo->indexInBlockL; int32_t index = pScanInfo->indexInBlockL;
if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
if (!hasData) { // current table does not have rows in last block, try next table if (!hasData) { // current table does not have rows in last block, try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (pStatus->pTableIter == NULL) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
continue; continue;
} }
} }
} else { // no data in last block, try next table } else { // no data in last block, try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (pStatus->pTableIter == NULL) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
continue; continue;
...@@ -2330,8 +2403,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2330,8 +2403,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
} }
// current table is exhausted, let's try next table // current table is exhausted, let's try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (pStatus->pTableIter == NULL) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -3343,6 +3416,8 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3343,6 +3416,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbDataFReaderClose(&pReader->pFileReader); tsdbDataFReaderClose(&pReader->pFileReader);
} }
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
SFilesetIter* pFilesetIter = &pReader->status.fileIter; SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) { if (pFilesetIter->pLastBlockReader != NULL) {
tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true); tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true);
......
...@@ -207,6 +207,7 @@ typedef struct SExprSupp { ...@@ -207,6 +207,7 @@ typedef struct SExprSupp {
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
uint16_t operatorType; uint16_t operatorType;
int16_t resultDataBlockId;
bool blocking; // block operator or not bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed uint8_t status; // denote if current operator is completed
char* name; // name, for debug purpose char* name; // name, for debug purpose
...@@ -218,7 +219,6 @@ typedef struct SOperatorInfo { ...@@ -218,7 +219,6 @@ typedef struct SOperatorInfo {
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
SOperatorFpSet fpSet; SOperatorFpSet fpSet;
int16_t resultDataBlockId;
} SOperatorInfo; } SOperatorInfo;
typedef enum { typedef enum {
...@@ -908,21 +908,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -908,21 +908,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream); STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
bool mergeResultBlock, SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SNode* pCondition, bool mergeResultBlocks, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild); SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
...@@ -930,22 +924,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -930,22 +924,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
SExecTaskInfo* pTaskInfo); SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
...@@ -956,10 +944,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -956,10 +944,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
#endif
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList); int32_t numOfOutput, SArray* pPseudoList);
......
...@@ -4024,7 +4024,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4024,7 +4024,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) { if (code) {
pTaskInfo->code = code; pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
return NULL; return NULL;
} }
...@@ -4162,9 +4162,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4162,9 +4162,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (ops[i] == NULL) { if (ops[i] == NULL) {
taosMemoryFree(ops); taosMemoryFree(ops);
return NULL; return NULL;
} else {
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
} }
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
} }
SOperatorInfo* pOptr = NULL; SOperatorInfo* pOptr = NULL;
...@@ -4216,37 +4216,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4216,37 +4216,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 0; int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
...@@ -4275,17 +4248,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4275,17 +4248,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
...@@ -4299,8 +4262,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4299,8 +4262,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else { } else {
ASSERT(0); ASSERT(0);
} }
taosMemoryFree(ops); taosMemoryFree(ops);
if (pOptr) pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; if (pOptr) {
pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
}
return pOptr; return pOptr;
} }
......
...@@ -1900,8 +1900,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1900,8 +1900,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
...@@ -2691,20 +2689,26 @@ _error: ...@@ -2691,20 +2689,26 @@ _error:
return NULL; return NULL;
} }
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId, SExecTaskInfo* pTaskInfo) {
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->stateCol = *pStateKeyCol; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
pInfo->pCondition = pCondition; pInfo->pCondition = pStateNode->window.node.pConditions;
if (pInfo->stateKey.pData == NULL) { if (pInfo->stateKey.pData == NULL) {
goto _error; goto _error;
} }
...@@ -2712,16 +2716,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -2712,16 +2716,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = *pTwAggSup; pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};;
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId; pInfo->tsSlotId = tsSlotId;
...@@ -2729,8 +2732,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -2729,8 +2732,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExpr;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -2804,8 +2805,6 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2804,8 +2805,6 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
...@@ -3440,8 +3439,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3440,8 +3439,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
...@@ -3627,8 +3624,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3627,8 +3624,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
...@@ -4889,8 +4884,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4889,8 +4884,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
...@@ -5106,9 +5099,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5106,9 +5099,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pRes; return (rows == 0) ? NULL : pRes;
} }
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
int32_t primaryTsSlotId, SNode* pCondition, bool mergeResultBlock,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo)); SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -5121,24 +5112,33 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -5121,24 +5112,33 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pNode->window.node.pOutputDataBlockDesc);
SInterval interval = {.interval = pNode->interval,
.sliding = pNode->sliding,
.intervalUnit = pNode->intervalUnit,
.slidingUnit = pNode->slidingUnit,
.offset = pNode->offset,
.precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
miaInfo->pCondition = pCondition; miaInfo->pCondition = pNode->window.node.pConditions;
miaInfo->curTs = INT64_MIN; miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window;
iaInfo->win = pTaskInfo->window; iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->interval = interval;
iaInfo->interval = *pInterval; iaInfo->execModel = pTaskInfo->execModel;
iaInfo->execModel = pTaskInfo->execModel; iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
iaInfo->primaryTsIndex = primaryTsSlotId; iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
iaInfo->binfo.mergeResultBlock = mergeResultBlock;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -5146,7 +5146,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -5146,7 +5146,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initBasicInfo(&iaInfo->binfo, pResBlock); initBasicInfo(&iaInfo->binfo, pResBlock);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
if (iaInfo->timeWindowInterpo) { if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
} }
...@@ -5154,14 +5154,12 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -5154,14 +5154,12 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
pOperator->name = "TimeMergeAlignedIntervalAggOperator"; pOperator->name = "TimeMergeAlignedIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->info = miaInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = miaInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL); destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL);
...@@ -5418,57 +5416,64 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5418,57 +5416,64 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pRes; return (rows == 0) ? NULL : pRes;
} }
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo) {
bool mergeBlock, SExecTaskInfo* pTaskInfo) { SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) { if (pMergeIntervalInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow)); int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
iaInfo->win = pTaskInfo->window;
iaInfo->inputOrder = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel;
iaInfo->binfo.mergeResultBlock = mergeBlock;
iaInfo->primaryTsIndex = primaryTsSlotId; SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
pIntervalInfo->win = pTaskInfo->window;
pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
pIntervalInfo->interval = interval;
pIntervalInfo->execModel = pTaskInfo->execModel;
pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
SExprSupp* pExprSupp = &pOperator->exprSupp; SExprSupp* pExprSupp = &pOperator->exprSupp;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initBasicInfo(&iaInfo->binfo, pResBlock); initBasicInfo(&pIntervalInfo->binfo, pResBlock);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo); pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
if (iaInfo->timeWindowInterpo) { if (pIntervalInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
if (iaInfo->binfo.resultRowInfo.openWindow == NULL) { if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error; goto _error;
} }
} }
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
pOperator->name = "TimeMergeIntervalAggOperator"; pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->info = pMergeIntervalInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = miaInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
...@@ -5481,7 +5486,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -5481,7 +5486,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
return pOperator; return pOperator;
_error: _error:
destroyMergeIntervalOperatorInfo(miaInfo); destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册