提交 78d1f75c 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 1c52b593
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#ifndef TDENGINE_QUERYUTIL_H #ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H
#include <libs/function/function.h> #include "function.h"
#include "nodes.h"
#include "plannodes.h"
#include "tbuffer.h" #include "tbuffer.h"
#include "tcommon.h" #include "tcommon.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
...@@ -77,7 +79,7 @@ typedef struct SResultRowInfo { ...@@ -77,7 +79,7 @@ typedef struct SResultRowInfo {
struct SqlFunctionCtx; struct SqlFunctionCtx;
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo);
...@@ -86,7 +88,7 @@ void initResultRow(SResultRow *pResultRow); ...@@ -86,7 +88,7 @@ void initResultRow(SResultRow *pResultRow);
void closeResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow);
bool isResultRowClosed(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId); SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId);
...@@ -98,9 +100,27 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, ...@@ -98,9 +100,27 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap,
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
bool hashRemainDataInGroupInfo(SGroupResInfo* pGroupResInfo); bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo, SNode* pTagCond);
SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractPartitionColInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -747,43 +747,27 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, ...@@ -747,43 +747,27 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo,
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList); SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
SArray* extractPartitionColInfo(SNodeList* pNodeList);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo);
SSDataBlock* loadNextDataBlock(void* param); SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
SExecTaskInfo* pTaskInfo, int32_t type);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
void clearupQueryTableDataCond(SQueryTableDataCond* pCond);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId, char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
...@@ -799,9 +783,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -799,9 +783,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
...@@ -842,7 +826,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -842,7 +826,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo); SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
...@@ -861,8 +846,8 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo ...@@ -861,8 +846,8 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
void setTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
...@@ -911,8 +896,6 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi ...@@ -911,8 +896,6 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo,
SNode* pTagCond);
int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
uint64_t taskId, SNode* pTagCond); uint64_t taskId, SNode* pTagCond);
......
此差异已折叠。
...@@ -26,8 +26,10 @@ ...@@ -26,8 +26,10 @@
#include "ttypes.h" #include "ttypes.h"
#include "executorInt.h" #include "executorInt.h"
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); static int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
...@@ -291,7 +293,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -291,7 +293,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
size_t rows = pRes->info.rows; size_t rows = pRes->info.rows;
if (rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -355,7 +357,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -355,7 +357,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes); doFilter(pInfo->pCondition, pRes);
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
...@@ -395,7 +397,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -395,7 +397,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
pOperator->blocking = true; pOperator->blocking = true;
...@@ -738,4 +740,18 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -738,4 +740,18 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
}
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo,
SAggSupporter* pAggSup) {
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = binfo->pCtx;
SResultRow* pResultRow =
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
assert(pResultRow != NULL);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -28,27 +28,32 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); ...@@ -28,27 +28,32 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) { if (pOperator == NULL || pInfo == NULL) {
goto _error; goto _error;
} }
SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator"; pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
SNode* pOnCondition = pJoinNode->pOnConditions;
if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) {
SOperatorNode* pNode = (SOperatorNode*)pOnCondition; SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
......
...@@ -496,18 +496,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -496,18 +496,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SInterval interval = {
.interval = pTableScanNode->interval,
.sliding = pTableScanNode->sliding,
.intervalUnit = pTableScanNode->intervalUnit,
.slidingUnit = pTableScanNode->slidingUnit,
.offset = pTableScanNode->offset,
};
return interval;
}
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
STableScanInfo* pTableScanInfo = pOptr->info; STableScanInfo* pTableScanInfo = pOptr->info;
...@@ -520,7 +508,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr ...@@ -520,7 +508,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param; STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock); blockDataDestroy(pTableScanInfo->pResBlock);
clearupQueryTableDataCond(&pTableScanInfo->cond); cleanupQueryTableDataCond(&pTableScanInfo->cond);
tsdbCleanupReadHandle(pTableScanInfo->dataReader); tsdbCleanupReadHandle(pTableScanInfo->dataReader);
...@@ -540,8 +528,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -540,8 +528,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SArray* pColList = SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1064,8 +1051,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan ...@@ -1064,8 +1051,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
int32_t numOfCols = 0; int32_t numOfCols = 0;
pInfo->pColMatchInfo = pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo); int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
...@@ -1523,7 +1509,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1523,7 +1509,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
} }
} }
setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen, extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
// todo log the filter info // todo log the filter info
...@@ -1612,7 +1598,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1612,7 +1598,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
SSDataBlock* pResBlock = createResDataBlock(pDescNode); SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t num = 0; int32_t num = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID); SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
pInfo->accountId = pScanPhyNode->accountId; pInfo->accountId = pScanPhyNode->accountId;
pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->showRewrite = pScanPhyNode->showRewrite;
...@@ -1818,27 +1804,26 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -1818,27 +1804,26 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
int32_t num = 0;
int32_t numOfExprs = 0; int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
int32_t num = 0; pInfo->pTableList = pTableListInfo;
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID); pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pInfo->pFilterNode = pPhyNode->node.pConditions;
pInfo->pTableList = pTableListInfo; pOperator->name = "TagScanOperator";
pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);
;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pInfo->pFilterNode = pPhyNode->node.pConditions;
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfExprs; pOperator->numOfExprs = numOfExprs;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
...@@ -1908,7 +1893,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand ...@@ -1908,7 +1893,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
uint64_t taskId, SNode* pTagCond) { uint64_t taskId, SNode* pTagCond) {
int32_t code = int32_t code =
getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1935,7 +1920,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand ...@@ -1935,7 +1920,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
taosArrayDestroy(subListInfo->pTableList); taosArrayDestroy(subListInfo->pTableList);
taosMemoryFree(subListInfo); taosMemoryFree(subListInfo);
} }
clearupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
return 0; return 0;
...@@ -2211,7 +2196,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2211,7 +2196,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
clearupQueryTableDataCond(&pTableScanInfo->cond); cleanupQueryTableDataCond(&pTableScanInfo->cond);
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
...@@ -2261,7 +2246,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2261,7 +2246,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t numOfCols = 0; int32_t numOfCols = 0;
SArray* pColList = SArray* pColList =
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -22,41 +22,52 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain ...@@ -22,41 +22,52 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) {
SExprInfo* pExprInfo, int32_t numOfCols, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t rowSize = pResBlock->info.rowSize; if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) {
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
goto _error; goto _error;
} }
pOperator->pExpr = pExprInfo; SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
pOperator->numOfExprs = numOfCols;
int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo =
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator"; pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now. // lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
// header pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer // there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16;
// TODO dynamic set the available sort buffer
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL,
getExplainExecInfo); getExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator; return pOperator;
_error: _error:
......
...@@ -1090,7 +1090,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1090,7 +1090,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -1122,7 +1122,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1122,7 +1122,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -1153,7 +1153,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1153,7 +1153,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pBlock->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -1176,7 +1176,7 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr ...@@ -1176,7 +1176,7 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset);
for (int32_t j = 0; j < numOfOutput; ++j) { for (int32_t j = 0; j < numOfOutput; ++j) {
SResultRowEntryInfo* pEntry = getResultCell(pRow, j, rowCellInfoOffset); SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, j, rowCellInfoOffset);
if (pRow->numOfRows < pEntry->numOfRes) { if (pRow->numOfRows < pEntry->numOfRes) {
pRow->numOfRows = pEntry->numOfRes; pRow->numOfRows = pEntry->numOfRes;
} }
...@@ -1199,7 +1199,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SOptrB ...@@ -1199,7 +1199,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SOptrB
SResultRow* pResult = getResultRowByPos(pResultBuf, p1); SResultRow* pResult = getResultRowByPos(pResultBuf, p1);
SqlFunctionCtx* pCtx = pBinfo->pCtx; SqlFunctionCtx* pCtx = pBinfo->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, pBinfo->rowCellInfoOffset); pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pBinfo->rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
continue; continue;
...@@ -1301,7 +1301,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1301,7 +1301,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
...@@ -1476,7 +1476,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1476,7 +1476,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
...@@ -1533,7 +1533,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1533,7 +1533,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
goto _error; goto _error;
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "StreamTimeIntervalAggOperator"; pOperator->name = "StreamTimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
...@@ -1643,7 +1643,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1643,7 +1643,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -1678,7 +1678,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1678,7 +1678,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -1714,7 +1714,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -1714,7 +1714,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// if (pOperator->status == OP_RES_TO_RETURN) { // if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator); // doSetOperatorCompleted(pOperator);
// } // }
// //
...@@ -1908,7 +1908,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1908,7 +1908,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode);
pInfo->binfo.pRes = pResultBlock; pInfo->binfo.pRes = pResultBlock;
...@@ -1956,7 +1956,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -1956,7 +1956,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = *pTwAggSup; pInfo->twAggSup = *pTwAggSup;
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
...@@ -2006,7 +2006,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -2006,7 +2006,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
} }
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId; pInfo->tsSlotId = tsSlotId;
...@@ -2153,7 +2153,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) ...@@ -2153,7 +2153,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
taosHashClear(pInfo->aggSup.pResultRowHashTable); taosHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf); clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
cleanupResultRowInfo(&pInfo->binfo.resultRowInfo); cleanupResultRowInfo(&pInfo->binfo.resultRowInfo);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 1); initResultRowInfo(&pInfo->binfo.resultRowInfo);
} }
static void clearUpdateDataBlock(SSDataBlock* pBlock) { static void clearUpdateDataBlock(SSDataBlock* pBlock) {
...@@ -2319,7 +2319,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2319,7 +2319,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
if (numOfChild > 0) { if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo)); pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
...@@ -2454,7 +2454,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx ...@@ -2454,7 +2454,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols); initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols);
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = tsSlotId; pInfo->primaryTsIndex = tsSlotId;
...@@ -2896,7 +2896,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2896,7 +2896,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
...@@ -3269,7 +3269,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3269,7 +3269,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
...@@ -3342,7 +3342,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3342,7 +3342,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->stateCol = extractColumnFromColumnNode(pColNode); pInfo->stateCol = extractColumnFromColumnNode(pColNode);
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pStateNode->window.watermark, .waterMark = pStateNode->window.watermark,
.calTrigger = pStateNode->window.triggerType, .calTrigger = pStateNode->window.triggerType,
...@@ -3590,7 +3590,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -3590,7 +3590,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
goto _error; goto _error;
} }
initResultRowInfo(&iaInfo->binfo.resultRowInfo, (int32_t)1); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
pOperator->name = "TimeMergeIntervalAggOperator"; pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
......
...@@ -209,7 +209,7 @@ TEST(testCase, inMem_sort_Test) { ...@@ -209,7 +209,7 @@ TEST(testCase, inMem_sort_Test) {
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi); taosArrayPush(orderInfo, &oi);
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc"); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL);
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info)); _info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
...@@ -298,7 +298,7 @@ TEST(testCase, external_mem_sort_Test) { ...@@ -298,7 +298,7 @@ TEST(testCase, external_mem_sort_Test) {
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi); taosArrayPush(orderInfo, &oi);
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc"); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL);
SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource))); SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
...@@ -365,7 +365,7 @@ TEST(testCase, ordered_merge_sort_Test) { ...@@ -365,7 +365,7 @@ TEST(testCase, ordered_merge_sort_Test) {
taosArrayPush(pBlock->pDataBlock, &colInfo); taosArrayPush(pBlock->pDataBlock, &colInfo);
} }
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc"); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL);
tsortSetComparFp(phandle, docomp); tsortSetComparFp(phandle, docomp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册