提交 b0ee829d 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 e6aed519
...@@ -140,7 +140,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo); ...@@ -140,7 +140,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId);
......
...@@ -537,15 +537,6 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -537,15 +537,6 @@ typedef struct SStreamIntervalOperatorInfo {
SWinKey delKey; SWinKey delKey;
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
} SAggOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo; struct SFillInfo* pFillInfo;
SSDataBlock* pRes; SSDataBlock* pRes;
...@@ -577,18 +568,6 @@ typedef struct SWindowRowsSup { ...@@ -577,18 +568,6 @@ typedef struct SWindowRowsSup {
uint64_t groupId; uint64_t groupId;
} SWindowRowsSup; } SWindowRowsSup;
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo { typedef struct SResultWindowInfo {
void* pOutputBuf; void* pOutputBuf;
SSessionKey sessionWin; SSessionKey sessionWin;
...@@ -681,37 +660,6 @@ typedef struct SStreamFillOperatorInfo { ...@@ -681,37 +660,6 @@ typedef struct SStreamFillOperatorInfo {
SStreamFillInfo* pFillInfo; SStreamFillInfo* pFillInfo;
} SStreamFillOperatorInfo; } SStreamFillOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SSDataBlock* pRes;
STimeWindow win;
SInterval interval;
int64_t current;
SArray* pPrevRow; // SArray<SGroupValue>
SArray* pNextRow; // SArray<SGroupValue>
SArray* pLinearInfo; // SArray<SFillLinearInfo>
bool isPrevRowSet;
bool isNextRowSet;
int32_t fillType; // fill type
SColumn tsCol; // primary timestamp column
SExprSupp scalarSup; // scalar calculation
struct SFillColInfo* pFillColInfo; // fill column info
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SExprSupp scalarSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
SColumn stateCol; // start row index
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
} SStateWindowOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
...@@ -726,6 +674,7 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo); ...@@ -726,6 +674,7 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup); void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey); const char* pkey);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
...@@ -735,12 +684,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr ...@@ -735,12 +684,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf); SDiskbasedBuf* pBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
...@@ -751,7 +700,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter ...@@ -751,7 +700,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doDestroyExchangeOperatorInfo(void* param); extern void doDestroyExchangeOperatorInfo(void* param);
void setOperatorCompleted(SOperatorInfo* pOperator); void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
...@@ -764,79 +713,73 @@ void cleanupAggSup(SAggSupporter* pAggSup); ...@@ -764,79 +713,73 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
bool isIntervalQuery, SAggSupporter* pSup); bool isIntervalQuery, SAggSupporter* pSup);
// operator creater functions
// clang-format off
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams,
SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
// clang-format on
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList); int32_t numOfOutput, SArray* pPseudoList);
......
...@@ -59,7 +59,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -59,7 +59,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
int32_t numOfCols = 0; int32_t numOfCols = 0;
code = code =
......
...@@ -303,7 +303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -303,7 +303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
} }
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
......
...@@ -208,7 +208,7 @@ SArray* createSortInfo(SNodeList* pNodeList) { ...@@ -208,7 +208,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
return pList; return pList;
} }
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots); int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = createDataBlock();
......
...@@ -76,6 +76,15 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { ...@@ -76,6 +76,15 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
} SAggOperatorInfo;
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
...@@ -316,8 +325,8 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus ...@@ -316,8 +325,8 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus
pCtx->input.startRowIndex = pStatus->startOffset; pCtx->input.startRowIndex = pStatus->startOffset;
} }
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
// keep it temporarily // keep it temporarily
SFunctionCtxStatus status = {0}; SFunctionCtxStatus status = {0};
...@@ -2039,7 +2048,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -2039,7 +2048,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
goto _error; goto _error;
} }
SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
...@@ -2213,7 +2222,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -2213,7 +2222,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
...@@ -2512,7 +2521,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -2512,7 +2521,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
......
...@@ -314,7 +314,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -314,7 +314,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
int32_t rowIndex = j - num; int32_t rowIndex = j - num;
doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
...@@ -331,7 +331,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -331,7 +331,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
int32_t rowIndex = pBlock->info.rows - num; int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
} }
} }
...@@ -431,7 +431,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -431,7 +431,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
goto _error; goto _error;
} }
SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
int32_t numOfScalarExpr = 0; int32_t numOfScalarExpr = 0;
...@@ -823,7 +823,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -823,7 +823,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
uint32_t defaultPgsz = 0; uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0; uint32_t defaultBufsz = 0;
pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
...@@ -1119,7 +1119,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1119,7 +1119,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
} }
pInfo->partitionSup.needCalc = true; pInfo->partitionSup.needCalc = true;
pInfo->binfo.pRes = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc); pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc);
if (pInfo->binfo.pRes == NULL) { if (pInfo->binfo.pRes == NULL) {
goto _error; goto _error;
} }
......
...@@ -87,7 +87,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -87,7 +87,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
} }
int32_t numOfCols = 0; int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
......
...@@ -85,7 +85,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -85,7 +85,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
...@@ -385,7 +385,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -385,7 +385,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
} }
} }
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc);
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
......
...@@ -885,7 +885,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -885,7 +885,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
...@@ -2352,7 +2352,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2352,7 +2352,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
goto _error; goto _error;
} }
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
...@@ -2476,8 +2476,7 @@ static void destroyTagScanOperatorInfo(void* param) { ...@@ -2476,8 +2476,7 @@ static void destroyTagScanOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -2499,7 +2498,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -2499,7 +2498,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error; goto _error;
} }
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
...@@ -2613,7 +2612,7 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { ...@@ -2613,7 +2612,7 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
return pList; return pList;
} }
int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
for (int i = 0; i < src->numOfCols; i++) { for (int i = 0; i < src->numOfCols; i++) {
...@@ -2664,7 +2663,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2664,7 +2663,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
taosArrayPush(pInfo->sortSourceParams, &param); taosArrayPush(pInfo->sortSourceParams, &param);
SQueryTableDataCond cond; SQueryTableDataCond cond;
dumpSQueryTableCond(&pInfo->base.cond, &cond); dumpQueryTableCond(&pInfo->base.cond, &cond);
taosArrayPush(pInfo->queryConds, &cond); taosArrayPush(pInfo->queryConds, &cond);
} }
...@@ -2900,7 +2899,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2900,7 +2899,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
} }
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
......
...@@ -47,7 +47,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -47,7 +47,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pDescNode); SSDataBlock* pResBlock = createDataBlockFromDescNode(pDescNode);
SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
...@@ -509,7 +509,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort ...@@ -509,7 +509,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
pInfo->binfo.pRes = createResDataBlock(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
...@@ -766,7 +766,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -766,7 +766,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
} }
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createResDataBlock(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize; int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
ASSERT(rowSize < 100 * 1024 * 1024); ASSERT(rowSize < 100 * 1024 * 1024);
...@@ -779,7 +779,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -779,7 +779,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
} }
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
......
...@@ -1411,7 +1411,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1411,7 +1411,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->pUser = taosMemoryStrDup((void*)pUser); pInfo->pUser = taosMemoryStrDup((void*)pUser);
pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->sysInfo = pScanPhyNode->sysInfo;
pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pCondition = pScanNode->node.pConditions; pInfo->pCondition = pScanNode->node.pConditions;
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
...@@ -1928,7 +1928,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -1928,7 +1928,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->uid = pBlockScanNode->suid; pInfo->uid = pBlockScanNode->suid;
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pResBlock, 1); blockDataEnsureCapacity(pInfo->pResBlock, 1);
int32_t numOfCols = 0; int32_t numOfCols = 0;
......
...@@ -1651,9 +1651,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1651,9 +1651,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
} }
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pSrcBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pPrevSrcBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pPrevSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册