diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 2e92f9e396eb2a0b76acfa1066c3829c22e71071..5a9f079c15ba7818aaa028f041e23fc2373936e9 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -39,8 +39,6 @@ #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) -#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) - typedef struct SGroupResInfo { int32_t index; SArray* pRows; // SArray diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 85424fd7de24021b4506966413d87f9abde6f159..4d19b8bb7685a3020b697ac47ba5266c99f29d0f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -85,39 +85,6 @@ typedef struct SLimit { typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; -typedef struct STaskCostInfo { - int64_t created; - int64_t start; - uint64_t elapsedTime; - double extractListTime; - double groupIdMapTime; - SFileBlockLoadRecorder* pRecoder; -} STaskCostInfo; - -typedef struct SOperatorCostInfo { - double openCost; - double totalCost; -} SOperatorCostInfo; - -struct SOperatorInfo; - -typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); -typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); - -typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); -typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); -typedef void (*__optr_close_fn_t)(void* param); -typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); -typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); - -typedef struct STaskIdInfo { - uint64_t queryId; // this is also a request id - uint64_t subplanId; - uint64_t templateId; - char* str; - int32_t vgId; -} STaskIdInfo; - enum { STREAM_RECOVER_STEP__NONE = 0, STREAM_RECOVER_STEP__PREPARE1, @@ -156,51 +123,6 @@ typedef struct SExchangeOpStopInfo { int64_t refId; } SExchangeOpStopInfo; -typedef struct STaskStopInfo { - SRWLatch lock; - SArray* pStopInfo; -} STaskStopInfo; - -struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; - int32_t qbufQuota; // total available buffer (in KB) during execution query - int64_t version; // used for stream to record wal version, why not move to sschemainfo - SStreamTaskInfo streamInfo; - SSchemaInfo schemaInfo; - const char* sql; // query sql string - jmp_buf env; // jump to this position when error happens. - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SSubplan* pSubplan; - struct SOperatorInfo* pRoot; - SLocalFetch localFetch; - SArray* pResultBlockList; // result block list - STaskStopInfo stopInfo; - SRWLatch lock; // secure the access of STableListInfo -}; - -enum { - OP_NOT_OPENED = 0x0, - OP_OPENED = 0x1, - OP_RES_TO_RETURN = 0x5, - OP_EXEC_DONE = 0x9, -}; - -typedef struct SOperatorFpSet { - __optr_open_fn_t _openFn; // DO NOT invoke this function directly - __optr_fn_t getNextFn; - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP - __optr_close_fn_t closeFn; - __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator - __optr_encode_fn_t encodeResultRow; - __optr_decode_fn_t decodeResultRow; - __optr_explain_fn_t getExplainFn; -} SOperatorFpSet; - typedef struct SExprSupp { SExprInfo* pExprInfo; int32_t numOfExprs; // the number of scalar expression in group operator @@ -209,22 +131,6 @@ typedef struct SExprSupp { SFilterInfo* pFilterInfo; } SExprSupp; -typedef struct SOperatorInfo { - uint16_t operatorType; - int16_t resultDataBlockId; - bool blocking; // block operator or not - uint8_t status; // denote if current operator is completed - char* name; // name, for debug purpose - void* info; // extension attribution - SExprSupp exprSupp; - SExecTaskInfo* pTaskInfo; - SOperatorCostInfo cost; - SResultInfo resultInfo; - struct SOperatorInfo** pDownstream; // downstram pointer list - int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - SOperatorFpSet fpSet; -} SOperatorInfo; - typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_READY = 0x2, @@ -449,8 +355,8 @@ typedef struct SStreamScanInfo { SUpdateInfo* pUpdateInfo; EStreamScanMode scanMode; - SOperatorInfo* pStreamScanOp; - SOperatorInfo* pTableScanOp; + struct SOperatorInfo* pStreamScanOp; + struct SOperatorInfo* pTableScanOp; SArray* childIds; SWindowSupporter windowSup; SPartitionBySupporter partitionSup; @@ -676,18 +582,8 @@ typedef struct SStreamFillOperatorInfo { #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) -SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, - char* dbFName); - -SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, - __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); -int32_t optrDummyOpenFn(SOperatorInfo* pOperator); -int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); -void setOperatorCompleted(SOperatorInfo* pOperator); -void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, - void* pInfo, SExecTaskInfo* pTaskInfo); -void destroyOperatorInfo(SOperatorInfo* pOperator); -int32_t optrDefaultBufFn(SOperatorInfo* pOperator); +SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); +int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void cleanupBasicInfo(SOptrBasicInfo* pInfo); @@ -703,9 +599,9 @@ void cleanupAggSup(SAggSupporter* pAggSup); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); -void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, +void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); -void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, +void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); @@ -719,12 +615,10 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, - SOperatorInfo* pOperator); + struct SOperatorInfo* pOperator); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); -SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id); -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); extern void doDestroyExchangeOperatorInfo(void* param); @@ -742,76 +636,6 @@ void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup); -// operator creater functions -// clang-format off -SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); - -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); - -SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo); -// clang-format on int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); @@ -820,19 +644,10 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t checkForQueryBuf(size_t numOfTables); -bool isTaskKilled(SExecTaskInfo* pTaskInfo); -void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); -void doDestroyTask(SExecTaskInfo* pTaskInfo); -void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); - -void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); - SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo); -int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, - int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle); -int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); +int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t order); @@ -854,17 +669,16 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); bool groupbyTbname(SNodeList* pGroupList); -int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, +int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); -int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, +int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); -int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, int64_t* pData); void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h new file mode 100644 index 0000000000000000000000000000000000000000..92018b6e3e9c3bd2d827e9ca50fd13b6e20dee8f --- /dev/null +++ b/source/libs/executor/inc/operator.h @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_OPERATOR_H +#define TDENGINE_OPERATOR_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SOperatorCostInfo { + double openCost; + double totalCost; +} SOperatorCostInfo; + +struct SOperatorInfo; + +typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); +typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); + +typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); +typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); +typedef void (*__optr_close_fn_t)(void* param); +typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); +typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); + +typedef struct SOperatorFpSet { + __optr_open_fn_t _openFn; // DO NOT invoke this function directly + __optr_fn_t getNextFn; + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_close_fn_t closeFn; + __optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator + __optr_encode_fn_t encodeResultRow; + __optr_decode_fn_t decodeResultRow; + __optr_explain_fn_t getExplainFn; +} SOperatorFpSet; + +enum { + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, + OP_RES_TO_RETURN = 0x5, + OP_EXEC_DONE = 0x9, +}; + +typedef struct SOperatorInfo { + uint16_t operatorType; + int16_t resultDataBlockId; + bool blocking; // block operator or not + uint8_t status; // denote if current operator is completed + char* name; // name, for debug purpose + void* info; // extension attribution + SExprSupp exprSupp; + SExecTaskInfo* pTaskInfo; + SOperatorCostInfo cost; + SResultInfo resultInfo; + struct SOperatorInfo** pDownstream; // downstram pointer list + int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator + SOperatorFpSet fpSet; +} SOperatorInfo; + +// operator creater functions +// clang-format off +SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); + +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); + +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo); +// clang-format on + +SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, + __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); +int32_t optrDummyOpenFn(SOperatorInfo* pOperator); +int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); +void setOperatorCompleted(SOperatorInfo* pOperator); +void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, + void* pInfo, SExecTaskInfo* pTaskInfo); +int32_t optrDefaultBufFn(SOperatorInfo* pOperator); + +SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, + SNode* pTagIndexCond, const char* pUser, const char* dbname); +void destroyOperator(SOperatorInfo* pOperator); + +SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id); +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); +int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_OPERATOR_H \ No newline at end of file diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h new file mode 100644 index 0000000000000000000000000000000000000000..aa42cd2c14ccdb76ded582dd47786a871988f3e2 --- /dev/null +++ b/source/libs/executor/inc/querytask.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_QUERYTASK_H +#define TDENGINE_QUERYTASK_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) + +typedef struct STaskIdInfo { + uint64_t queryId; // this is also a request id + uint64_t subplanId; + uint64_t templateId; + char* str; + int32_t vgId; +} STaskIdInfo; + +typedef struct STaskCostInfo { + int64_t created; + int64_t start; + uint64_t elapsedTime; + double extractListTime; + double groupIdMapTime; + SFileBlockLoadRecorder* pRecoder; +} STaskCostInfo; + +typedef struct STaskStopInfo { + SRWLatch lock; + SArray* pStopInfo; +} STaskStopInfo; + +struct SExecTaskInfo { + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; + int32_t qbufQuota; // total available buffer (in KB) during execution query + int64_t version; // used for stream to record wal version, why not move to sschemainfo + SStreamTaskInfo streamInfo; + SSchemaInfo schemaInfo; + const char* sql; // query sql string + jmp_buf env; // jump to this position when error happens. + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SSubplan* pSubplan; + struct SOperatorInfo* pRoot; + SLocalFetch localFetch; + SArray* pResultBlockList; // result block list + STaskStopInfo stopInfo; + SRWLatch lock; // secure the access of STableListInfo +}; + +void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); +SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model); +void doDestroyTask(SExecTaskInfo* pTaskInfo); +bool isTaskKilled(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); +void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); +int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, + int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_QUERYTASK_H diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index ec8060348d4b5dd465be4e6356c73994aa54df7c..c00c167d8e03b5db61f31ede8e8bab5f95ccfd76 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -15,7 +15,6 @@ #include "filter.h" #include "function.h" -#include "functionMgt.h" #include "os.h" #include "querynodes.h" #include "tfill.h" @@ -23,16 +22,14 @@ #include "tdatablock.h" #include "tglobal.h" -#include "tmsg.h" -#include "ttime.h" - #include "executorimpl.h" #include "index.h" #include "query.h" #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "vnode.h" +#include "operator.h" +#include "querytask.h" typedef struct { bool hasAgg; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index f6fc332b3793ee25830cacf585c4d7d6b698f959..6fee0899ced11687ea70684beae95bf7deee3b39 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -24,6 +24,8 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" +#include "operator.h" +#include "querytask.h" typedef struct SCacheRowsScanInfo { SSDataBlock* pRes; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 559fce52249facc32213789eee9bf2655b36ee16..8ec7ed91a38dfdd801a50ee8dddce525e329921f 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -21,6 +21,8 @@ #include "tcompare.h" #include "tdatablock.h" #include "ttime.h" +#include "operator.h" +#include "querytask.h" typedef struct SEventWindowOperatorInfo { SOptrBasicInfo binfo; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c855a104b2cccdb7ed28fe36e5e9c740ec6f5bd9..31445dd5683fe28624c546abab21a5095827c273 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -24,6 +24,8 @@ #include "index.h" #include "query.h" #include "thash.h" +#include "operator.h" +#include "querytask.h" typedef struct SFetchRspHandleWrapper { uint32_t exchangeId; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d79ed0176db5c74c0f8d11b2e513eb1ac8097128..75e1d374e40f5470d7229d46761daa6ff2a52bf4 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -26,6 +26,7 @@ #include "executil.h" #include "executorimpl.h" #include "tcompression.h" +#include "querytask.h" typedef struct STableListIdInfo { uint64_t suid; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 546cd18cdae64aa8725593417b8d685c5d5af1d9..2b9ab6c09a270ad30b81aaf643b84e24ab0faa37 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,13 +14,14 @@ */ #include "executor.h" -#include #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" #include "tref.h" #include "tudf.h" #include "vnode.h" +#include "operator.h" +#include "querytask.h" static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; int32_t exchangeObjRefPool = -1; @@ -249,7 +250,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) { if (msg == NULL) { // create raw scan - SExecTaskInfo* pTaskInfo = doCreateExecTaskInfo(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, ""); + SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE); if (NULL == pTaskInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7594079cfb03fbe7b8a3a06b21aa6b8b625deb23..3ab929cad0816202162433827f257a80b76b0fb7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -22,7 +22,6 @@ #include "tname.h" #include "tdatablock.h" -#include "tglobal.h" #include "tmsg.h" #include "ttime.h" @@ -33,6 +32,8 @@ #include "thash.h" #include "ttypes.h" #include "vnode.h" +#include "operator.h" +#include "querytask.h" #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) @@ -71,12 +72,8 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define realloc u_realloc #endif -#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) - static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); -static void releaseQueryBuf(size_t numOfTables); - static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); @@ -86,44 +83,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo); -static SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); - -void setOperatorCompleted(SOperatorInfo* pOperator) { - pOperator->status = OP_EXEC_DONE; - pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0; - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); -} - -void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, - void* pInfo, SExecTaskInfo* pTaskInfo) { - pOperator->name = (char*)name; - pOperator->operatorType = type; - pOperator->blocking = blocking; - pOperator->status = status; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; -} - -int32_t optrDummyOpenFn(SOperatorInfo* pOperator) { - OPTR_SET_OPENED(pOperator); - pOperator->cost.openCost = 0; - return TSDB_CODE_SUCCESS; -} - -SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, - __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, - __optr_explain_fn_t explain) { - SOperatorFpSet fpSet = { - ._openFn = openFn, - .getNextFn = nextFn, - .cleanupFn = cleanup, - .closeFn = closeFn, - .reqBufFn = reqBufFn, - .getExplainFn = explain, - }; - - return fpSet; -} SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; @@ -482,10 +441,6 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } } -bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code); } - -void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } - ///////////////////////////////////////////////////////////////////////////////////////////// STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { STimeWindow win = {0}; @@ -503,16 +458,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int return win; } -void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { - if (status == TASK_NOT_COMPLETED) { - pTaskInfo->status = status; - } else { - // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first - CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED); - pTaskInfo->status |= status; - } -} - void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { bool init = false; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -949,72 +894,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } } -int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) { - p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES); - if (p->pDownstream == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES); - p->numOfDownstream = num; - return TSDB_CODE_SUCCESS; -} - -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) { - // todo add more information about exchange operation - int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { - *order = TSDB_ORDER_ASC; - *scanFlag = MAIN_SCAN; - return TSDB_CODE_SUCCESS; - } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { - if (!inheritUsOrder) { - *order = TSDB_ORDER_ASC; - } - *scanFlag = MAIN_SCAN; - return TSDB_CODE_SUCCESS; - } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { - STableScanInfo* pTableScanInfo = pOperator->info; - *order = pTableScanInfo->base.cond.order; - *scanFlag = pTableScanInfo->base.scanFlag; - return TSDB_CODE_SUCCESS; - } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { - STableMergeScanInfo* pTableScanInfo = pOperator->info; - *order = pTableScanInfo->base.cond.order; - *scanFlag = pTableScanInfo->base.scanFlag; - return TSDB_CODE_SUCCESS; - } else { - if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { - return TSDB_CODE_INVALID_PARA; - } else { - return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder); - } - } -} - -// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN -SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) { - if (pOperator == NULL) { - qError("invalid operator, failed to find tableScanOperator %s", id); - terrno = TSDB_CODE_PAR_INTERNAL_ERROR; - return NULL; - } - - if (pOperator->operatorType == type) { - return pOperator; - } else { - if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { - qError("invalid operator, failed to find tableScanOperator %s", id); - terrno = TSDB_CODE_PAR_INTERNAL_ERROR; - return NULL; - } - - return extractOperatorInTree(pOperator->pDownstream[0], type, id); - } -} - void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { for (int32_t i = 0; i < numOfExprs; ++i) { SExprInfo* pExprInfo = &pExpr[i]; @@ -1031,37 +910,6 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { } } -void destroyOperatorInfo(SOperatorInfo* pOperator) { - if (pOperator == NULL) { - return; - } - - if (pOperator->fpSet.closeFn != NULL) { - pOperator->fpSet.closeFn(pOperator->info); - } - - if (pOperator->pDownstream != NULL) { - for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - destroyOperatorInfo(pOperator->pDownstream[i]); - } - - taosMemoryFreeClear(pOperator->pDownstream); - pOperator->numOfDownstream = 0; - } - - cleanupExprSupp(&pOperator->exprSupp); - taosMemoryFreeClear(pOperator); -} - -// each operator should be set their own function to return total cost buffer -int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { - if (pOperator->blocking) { - return -1; - } else { - return 0; - } -} - int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) { *defaultPgsz = 4096; while (*defaultPgsz < rowSize * 4) { @@ -1151,136 +999,6 @@ void cleanupExprSupp(SExprSupp* pSupp) { void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst) { - char* p = dst; - - int32_t offset = 6; - memcpy(p, "TID:0x", offset); - offset += tintToHex(taskId, &p[offset]); - - memcpy(&p[offset], " QID:0x", 7); - offset += 7; - offset += tintToHex(queryId, &p[offset]); - - p[offset] = 0; -} - -SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, - char* dbFName) { - SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); - if (pTaskInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTaskInfo->cost.created = taosGetTimestampUs(); - - pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName); - pTaskInfo->execModel = model; - pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); - pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); - - taosInitRWLatch(&pTaskInfo->lock); - pTaskInfo->id.vgId = vgId; - pTaskInfo->id.queryId = queryId; - - pTaskInfo->id.str = taosMemoryMalloc(64); - buildTaskId(taskId, queryId, pTaskInfo->id.str); - return pTaskInfo; -} - -int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { - SMetaReader mr = {0}; - if (pHandle == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; - } - - metaReaderInit(&mr, pHandle->meta, 0); - int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid, - GET_TASKID(pTaskInfo)); - - metaReaderClear(&mr); - return terrno; - } - - SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo; - pSchemaInfo->tablename = taosStrdup(mr.me.name); - - if (mr.me.type == TSDB_SUPER_TABLE) { - pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; - } else if (mr.me.type == TSDB_CHILD_TABLE) { - tDecoderClear(&mr.coder); - - tb_uid_t suid = mr.me.ctbEntry.suid; - code = metaGetTableEntryByUidCache(&mr, suid); - if (code != TSDB_CODE_SUCCESS) { - metaReaderClear(&mr); - return terrno; - } - - pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; - } else { - pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); - } - - metaReaderClear(&mr); - - pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode); - return TSDB_CODE_SUCCESS; -} - -SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { - int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); - int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols); - - SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema)); - - for (int32_t i = 0; i < numOfCols; ++i) { - STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); - SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - - SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++]; - pSchema->colId = pColNode->colId; - pSchema->type = pColNode->node.resType.type; - pSchema->bytes = pColNode->node.resType.bytes; - tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); - } - - // this the tags and pseudo function columns, we only keep the tag columns - for (int32_t i = 0; i < numOfTags; ++i) { - STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i); - - int32_t type = nodeType(pNode->pExpr); - if (type == QUERY_NODE_COLUMN) { - SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - - SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++]; - pSchema->colId = pColNode->colId; - pSchema->type = pColNode->node.resType.type; - pSchema->bytes = pColNode->node.resType.bytes; - tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); - } - } - - return pqSw; -} - -static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { - taosMemoryFreeClear(pSchemaInfo->dbname); - taosMemoryFreeClear(pSchemaInfo->tablename); - tDeleteSSchemaWrapper(pSchemaInfo->sw); - tDeleteSSchemaWrapper(pSchemaInfo->qsw); -} - -static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } - bool groupbyTbname(SNodeList* pGroupList) { bool bytbname = false; if (LIST_LENGTH(pGroupList) == 1) { @@ -1294,306 +1012,6 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, - SNode* pTagIndexCond, const char* pUser) { - int32_t type = nodeType(pPhyNode); - const char* idstr = GET_TASKID(pTaskInfo); - - if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { - SOperatorInfo* pOperator = NULL; - if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { - STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - - // NOTE: this is an patch to fix the physical plan - // TODO remove it later - if (pTableScanNode->scan.node.pLimit != NULL) { - pTableScanNode->groupSort = true; - } - - STableListInfo* pTableListInfo = tableListCreate(); - int32_t code = - createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); - if (code) { - pTaskInfo->code = code; - tableListDestroy(pTableListInfo); - qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); - return NULL; - } - - code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); - if (code) { - pTaskInfo->code = terrno; - tableListDestroy(pTableListInfo); - return NULL; - } - - pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); - if (NULL == pOperator) { - pTaskInfo->code = terrno; - return NULL; - } - - STableScanInfo* pScanInfo = pOperator->info; - pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; - } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { - STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; - STableListInfo* pTableListInfo = tableListCreate(); - - int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); - if (code) { - pTaskInfo->code = code; - tableListDestroy(pTableListInfo); - qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); - return NULL; - } - - code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); - if (code) { - pTaskInfo->code = terrno; - tableListDestroy(pTableListInfo); - return NULL; - } - - pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); - if (NULL == pOperator) { - pTaskInfo->code = terrno; - tableListDestroy(pTableListInfo); - return NULL; - } - - STableScanInfo* pScanInfo = pOperator->info; - pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; - } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { - pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, - pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { - STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - STableListInfo* pTableListInfo = tableListCreate(); - - if (pHandle->vnode) { - int32_t code = - createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, - pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); - if (code) { - pTaskInfo->code = code; - tableListDestroy(pTableListInfo); - qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); - return NULL; - } - } - - pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); - pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { - SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; - pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) { - STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode; - pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { - STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - STableListInfo* pTableListInfo = tableListCreate(); - int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, - pTagIndexCond, pTaskInfo); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - qError("failed to getTableList, code: %s", tstrerror(code)); - return NULL; - } - - pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { - SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; - STableListInfo* pTableListInfo = tableListCreate(); - - if (pBlockNode->tableType == TSDB_SUPER_TABLE) { - SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); - int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = terrno; - return NULL; - } - - size_t num = taosArrayGetSize(pList); - for (int32_t i = 0; i < num; ++i) { - STableKeyInfo* p = taosArrayGet(pList, i); - tableListAddTableInfo(pTableListInfo, p->uid, 0); - } - - taosArrayDestroy(pList); - } else { // Create group with only one table - tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); - } - - pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { - SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; - STableListInfo* pTableListInfo = tableListCreate(); - - int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, - pTagCond, pTagIndexCond, pTaskInfo); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return NULL; - } - - code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return NULL; - } - - pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { - pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); - } else { - terrno = TSDB_CODE_INVALID_PARA; - return NULL; - } - - if (pOperator != NULL) { // todo moved away - pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; - } - - return pOperator; - } - - size_t size = LIST_LENGTH(pPhyNode->pChildren); - SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); - if (ops == NULL) { - return NULL; - } - - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser); - if (ops[i] == NULL) { - taosMemoryFree(ops); - return NULL; - } - } - - SOperatorInfo* pOptr = NULL; - if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { - pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) { - SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; - if (pAggNode->pGroupKeys != NULL) { - pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo); - } else { - pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo); - } - } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) { - SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { - pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { - SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; - pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { - SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; - pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { - int32_t children = 0; - pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { - int32_t children = pHandle->numOfVgroups; - pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); - } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { - pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) { - pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) { - SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode; - pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) { - SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; - pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { - pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { - int32_t children = 0; - pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) { - int32_t children = pHandle->numOfVgroups; - pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); - } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { - pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) { - pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { - SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; - pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { - pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { - pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { - pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) { - pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { - pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { - pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) { - pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else { - terrno = TSDB_CODE_INVALID_PARA; - taosMemoryFree(ops); - return NULL; - } - - taosMemoryFree(ops); - if (pOptr) { - pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; - } - - return pOptr; -} - -static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) { - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - if (pOperator->numOfDownstream == 0) { - qError("failed to find stream scan operator"); - return TSDB_CODE_APP_ERROR; - } - - if (pOperator->numOfDownstream > 1) { - qError("join not supported for stream block scan"); - return TSDB_CODE_APP_ERROR; - } - return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo); - } else { - SStreamScanInfo* pInfo = pOperator->info; - *ppInfo = pInfo->pTableScanOp->info; - return 0; - } -} - -int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) { - if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) { - if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) { - *ppNode = (STableScanPhysiNode*)pNode; - return 0; - } else { - terrno = TSDB_CODE_APP_ERROR; - return -1; - } - } else { - if (LIST_LENGTH(pNode->pChildren) != 1) { - terrno = TSDB_CODE_APP_ERROR; - return -1; - } - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0); - return extractTableScanNode(pChildNode, ppNode); - } - return -1; -} - int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) { switch (pNode->type) { case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { @@ -1641,100 +1059,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* return TSDB_CODE_SUCCESS; } -int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, - int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) { - *pTaskInfo = doCreateExecTaskInfo(pPlan->id.queryId, taskId, vgId, model, pPlan->dbFName); - if (*pTaskInfo == NULL) { - goto _complete; - } - - if (pHandle) { - if (pHandle->pStateBackend) { - (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; - } - } - - (*pTaskInfo)->sql = sql; - sql = NULL; - - (*pTaskInfo)->pSubplan = pPlan; - (*pTaskInfo)->pRoot = - createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); - - if (NULL == (*pTaskInfo)->pRoot) { - terrno = (*pTaskInfo)->code; - goto _complete; - } - - return TSDB_CODE_SUCCESS; - -_complete: - taosMemoryFree(sql); - doDestroyTask(*pTaskInfo); - return terrno; -} - -static void freeBlock(void* pParam) { - SSDataBlock* pBlock = *(SSDataBlock**)pParam; - blockDataDestroy(pBlock); -} - -void doDestroyTask(SExecTaskInfo* pTaskInfo) { - qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - destroyOperatorInfo(pTaskInfo->pRoot); - cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); - cleanupStreamInfo(&pTaskInfo->streamInfo); - - if (!pTaskInfo->localFetch.localExec) { - nodesDestroyNode((SNode*)pTaskInfo->pSubplan); - } - - taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock); - taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo); - taosMemoryFreeClear(pTaskInfo->sql); - taosMemoryFreeClear(pTaskInfo->id.str); - taosMemoryFreeClear(pTaskInfo); -} - -static int64_t getQuerySupportBufSize(size_t numOfTables) { - size_t s1 = sizeof(STableQueryInfo); - // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb - return (int64_t)(s1 * 1.5 * numOfTables); -} - -int32_t checkForQueryBuf(size_t numOfTables) { - int64_t t = getQuerySupportBufSize(numOfTables); - if (tsQueryBufferSizeBytes < 0) { - return TSDB_CODE_SUCCESS; - } else if (tsQueryBufferSizeBytes > 0) { - while (1) { - int64_t s = tsQueryBufferSizeBytes; - int64_t remain = s - t; - if (remain >= 0) { - if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) { - return TSDB_CODE_SUCCESS; - } - } else { - return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; - } - } - } - - // disable query processing if the value of tsQueryBufferSize is zero. - return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; -} - -void releaseQueryBuf(size_t numOfTables) { - if (tsQueryBufferSizeBytes < 0) { - return; - } - - int64_t t = getQuerySupportBufSize(numOfTables); - - // restore value is not enough buffer available - atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); -} - int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { SExplainExecInfo execInfo = {0}; SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 234f1a666ce58bdd006f9768f1b076b29e5c1bf7..e465e6a52518dc0688ed8cded6b1ebde9b556859 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -25,11 +25,13 @@ #include "thash.h" #include "ttime.h" -#include "executorInt.h" #include "function.h" #include "querynodes.h" #include "tdatablock.h" #include "tfill.h" +#include "operator.h" +#include "querytask.h" + #define FILL_POS_INVALID 0 #define FILL_POS_START 1 diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index aaab95e36186bb3fc73da0d4e49909a35c3b219c..821db4244ed396bebff1fc45234fa075efe25ecc 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -26,6 +26,8 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" +#include "operator.h" +#include "querytask.h" typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 31ff11eec587c70e05d219be3214c26980fdb38e..b4862a490fb540a143b2c780bd6283b01bbe0b6f 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -23,6 +23,8 @@ #include "thash.h" #include "tmsg.h" #include "ttypes.h" +#include "operator.h" +#include "querytask.h" typedef struct SJoinRowCtx { bool rowRemains; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c new file mode 100644 index 0000000000000000000000000000000000000000..5e384ca6dd13f6b9092853139d33c65ec77d51e1 --- /dev/null +++ b/source/libs/executor/src/operator.c @@ -0,0 +1,580 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "filter.h" +#include "function.h" +#include "os.h" +#include "tname.h" + +#include "tglobal.h" + +#include "executorimpl.h" +#include "index.h" +#include "query.h" +#include "vnode.h" +#include "operator.h" +#include "querytask.h" + +SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, + __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, + __optr_explain_fn_t explain) { + SOperatorFpSet fpSet = { + ._openFn = openFn, + .getNextFn = nextFn, + .cleanupFn = cleanup, + .closeFn = closeFn, + .reqBufFn = reqBufFn, + .getExplainFn = explain, + }; + + return fpSet; +} + +int32_t optrDummyOpenFn(SOperatorInfo* pOperator) { + OPTR_SET_OPENED(pOperator); + pOperator->cost.openCost = 0; + return TSDB_CODE_SUCCESS; +} + +int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) { + p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES); + if (p->pDownstream == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES); + p->numOfDownstream = num; + return TSDB_CODE_SUCCESS; +} + +void setOperatorCompleted(SOperatorInfo* pOperator) { + pOperator->status = OP_EXEC_DONE; + pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0; + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); +} + +void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, + void* pInfo, SExecTaskInfo* pTaskInfo) { + pOperator->name = (char*)name; + pOperator->operatorType = type; + pOperator->blocking = blocking; + pOperator->status = status; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; +} + +void destroyOperator(SOperatorInfo* pOperator) { + if (pOperator == NULL) { + return; + } + + if (pOperator->fpSet.closeFn != NULL) { + pOperator->fpSet.closeFn(pOperator->info); + } + + if (pOperator->pDownstream != NULL) { + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + destroyOperator(pOperator->pDownstream[i]); + } + + taosMemoryFreeClear(pOperator->pDownstream); + pOperator->numOfDownstream = 0; + } + + cleanupExprSupp(&pOperator->exprSupp); + taosMemoryFreeClear(pOperator); +} + +// each operator should be set their own function to return total cost buffer +int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { + if (pOperator->blocking) { + return -1; + } else { + return 0; + } +} + +//int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) { +// // todo add more information about exchange operation +// int32_t type = pOperator->operatorType; +// if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || +// type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || +// type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { +// *order = TSDB_ORDER_ASC; +// *scanFlag = MAIN_SCAN; +// return TSDB_CODE_SUCCESS; +// } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { +// if (!inheritUsOrder) { +// *order = TSDB_ORDER_ASC; +// } +// *scanFlag = MAIN_SCAN; +// return TSDB_CODE_SUCCESS; +// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { +// STableScanInfo* pTableScanInfo = pOperator->info; +// *order = pTableScanInfo->base.cond.order; +// *scanFlag = pTableScanInfo->base.scanFlag; +// return TSDB_CODE_SUCCESS; +// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { +// STableMergeScanInfo* pTableScanInfo = pOperator->info; +// *order = pTableScanInfo->base.cond.order; +// *scanFlag = pTableScanInfo->base.scanFlag; +// return TSDB_CODE_SUCCESS; +// } else { +// if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { +// return TSDB_CODE_INVALID_PARA; +// } else { +// return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder); +// } +// } +//} + +static int64_t getQuerySupportBufSize(size_t numOfTables) { + size_t s1 = sizeof(STableQueryInfo); + // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb + return (int64_t)(s1 * 1.5 * numOfTables); +} + +int32_t checkForQueryBuf(size_t numOfTables) { + int64_t t = getQuerySupportBufSize(numOfTables); + if (tsQueryBufferSizeBytes < 0) { + return TSDB_CODE_SUCCESS; + } else if (tsQueryBufferSizeBytes > 0) { + while (1) { + int64_t s = tsQueryBufferSizeBytes; + int64_t remain = s - t; + if (remain >= 0) { + if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) { + return TSDB_CODE_SUCCESS; + } + } else { + return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; + } + } + } + + // disable query processing if the value of tsQueryBufferSize is zero. + return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER; +} + +void releaseQueryBuf(size_t numOfTables) { + if (tsQueryBufferSizeBytes < 0) { + return; + } + + int64_t t = getQuerySupportBufSize(numOfTables); + + // restore value is not enough buffer available + atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); +} + +typedef enum { + OPTR_FN_RET_CONTINUE = 0x1, + OPTR_FN_RET_ABORT = 0x2, +} ERetType; + +typedef struct STraverParam { + void* pRet; + int32_t code; + void* pParam; +} STraverParam; + +// iterate the operator tree helper +typedef ERetType (*optr_fn_t)(SOperatorInfo *pOperator, STraverParam *pParam, const char* pIdstr); + +void traverseOperatorTree(SOperatorInfo* pOperator, optr_fn_t fn, STraverParam* pParam, const char* id) { + if (pOperator == NULL) { + return; + } + + ERetType ret = fn(pOperator, pParam, id); + if (ret == OPTR_FN_RET_ABORT || pParam->code != TSDB_CODE_SUCCESS) { + return; + } + + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + traverseOperatorTree(pOperator->pDownstream[i], fn, pParam, id); + if (pParam->code != 0) { + break; + } + } +} + +ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) { + STraverParam* p = pParam; + if (pOperator->operatorType == *(int32_t*)p->pParam) { + p->pRet = pOperator; + return OPTR_FN_RET_ABORT; + } else { + return OPTR_FN_RET_CONTINUE; + } +} + +// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN +SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) { + if (pOperator == NULL) { + qError("invalid operator, failed to find tableScanOperator %s", id); + terrno = TSDB_CODE_PAR_INTERNAL_ERROR; + return NULL; + } + + STraverParam p = {.pParam = &type, .pRet = NULL}; + traverseOperatorTree(pOperator, extractOperatorInfo, &p, id); + if (p.code != 0) { + terrno = p.code; + return NULL; + } else { + return p.pRet; + } +} + +typedef struct SExtScanInfo { + int32_t order; + int32_t scanFlag; + int32_t inheritUsOrder; +} SExtScanInfo; + +static ERetType extractScanInfo(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) { + int32_t type = pOperator->operatorType; + SExtScanInfo* pInfo = pParam->pParam; + + if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { + pInfo->order = TSDB_ORDER_ASC; + pInfo->scanFlag= MAIN_SCAN; + return OPTR_FN_RET_ABORT; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { + if (!pInfo->inheritUsOrder) { + pInfo->order = TSDB_ORDER_ASC; + } + pInfo->scanFlag= MAIN_SCAN; + return OPTR_FN_RET_ABORT; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pTableScanInfo = pOperator->info; + pInfo->order = pTableScanInfo->base.cond.order; + pInfo->scanFlag= pTableScanInfo->base.scanFlag; + return OPTR_FN_RET_ABORT; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { + STableMergeScanInfo* pTableScanInfo = pOperator->info; + pInfo->order = pTableScanInfo->base.cond.order; + pInfo->scanFlag= pTableScanInfo->base.scanFlag; + return OPTR_FN_RET_ABORT; + } else { + return OPTR_FN_RET_CONTINUE; + } +} + +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) { + SExtScanInfo info = {.inheritUsOrder = inheritUsOrder, .order = *order}; + STraverParam p = {.pParam = &info}; + + traverseOperatorTree(pOperator, extractScanInfo, &p, NULL); + *order = info.order; + *scanFlag = info.scanFlag; + + ASSERT(*order == TSDB_ORDER_ASC || *order == TSDB_ORDER_DESC); + return p.code; +} + +static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam, const char* pIdStr) { + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pInfo = pOperator->info; + + if (pInfo->base.dataReader != NULL) { + tsdbReaderSetCloseFlag(pInfo->base.dataReader); + } + return OPTR_FN_RET_ABORT; + } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pOperator->info; + + if (pInfo->pTableScanOp != NULL) { + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + if (pTableScanInfo != NULL && pTableScanInfo->base.dataReader != NULL) { + tsdbReaderSetCloseFlag(pTableScanInfo->base.dataReader); + } + } + + return OPTR_FN_RET_ABORT; + } + + return OPTR_FN_RET_CONTINUE; +} + +int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr) { + STraverParam p = {0}; + traverseOperatorTree(pOperator, doStopDataReader, &p, pIdStr); + return p.code; +} + +SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, + SNode* pTagIndexCond, const char* pUser, const char* dbname) { + int32_t type = nodeType(pPhyNode); + const char* idstr = GET_TASKID(pTaskInfo); + + if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { + SOperatorInfo* pOperator = NULL; + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { + STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; + + // NOTE: this is an patch to fix the physical plan + // TODO remove it later + if (pTableScanNode->scan.node.pLimit != NULL) { + pTableScanNode->groupSort = true; + } + + STableListInfo* pTableListInfo = tableListCreate(); + int32_t code = + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, + pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); + return NULL; + } + + code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); + if (code) { + pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); + return NULL; + } + + pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); + if (NULL == pOperator) { + pTaskInfo->code = terrno; + return NULL; + } + + STableScanInfo* pScanInfo = pOperator->info; + pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; + } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { + STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, + pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); + return NULL; + } + + code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); + if (code) { + pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); + return NULL; + } + + pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); + if (NULL == pOperator) { + pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); + return NULL; + } + + STableScanInfo* pScanInfo = pOperator->info; + pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; + } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { + pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, + pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { + STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + + if (pHandle->vnode) { + int32_t code = + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, + pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); + return NULL; + } + } + + pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); + pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { + SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; + pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) { + STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode; + pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { + STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, + pTagIndexCond, pTaskInfo); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + qError("failed to getTableList, code: %s", tstrerror(code)); + return NULL; + } + + pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { + SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + + if (pBlockNode->tableType == TSDB_SUPER_TABLE) { + SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); + int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = terrno; + return NULL; + } + + size_t num = taosArrayGetSize(pList); + for (int32_t i = 0; i < num; ++i) { + STableKeyInfo* p = taosArrayGet(pList, i); + tableListAddTableInfo(pTableListInfo, p->uid, 0); + } + + taosArrayDestroy(pList); + } else { // Create group with only one table + tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); + } + + pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { + SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + + int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, + pTagCond, pTagIndexCond, pTaskInfo); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; + } + + code = initQueriedTableSchemaInfo(pHandle, &pScanNode->scan, dbname, pTaskInfo); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; + } + + pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { + pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); + } else { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + if (pOperator != NULL) { // todo moved away + pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; + } + + return pOperator; + } + + size_t size = LIST_LENGTH(pPhyNode->pChildren); + SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); + if (ops == NULL) { + return NULL; + } + + for (int32_t i = 0; i < size; ++i) { + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); + ops[i] = createOperator(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser, dbname); + if (ops[i] == NULL) { + taosMemoryFree(ops); + return NULL; + } + } + + SOperatorInfo* pOptr = NULL; + if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { + pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) { + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; + if (pAggNode->pGroupKeys != NULL) { + pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo); + } else { + pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo); + } + } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) { + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; + pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { + pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { + SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; + pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { + SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; + pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { + int32_t children = 0; + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { + int32_t children = pHandle->numOfVgroups; + pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { + pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) { + pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) { + SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode; + pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) { + SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; + pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { + pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { + int32_t children = 0; + pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) { + int32_t children = pHandle->numOfVgroups; + pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { + pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) { + pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { + SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; + pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { + pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { + pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { + pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) { + pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { + pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { + pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) { + pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else { + terrno = TSDB_CODE_INVALID_PARA; + taosMemoryFree(ops); + return NULL; + } + + taosMemoryFree(ops); + if (pOptr) { + pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; + } + + return pOptr; +} diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 86c49e0fc82b76a595480e61b53167aab4b81f18..01ee18b41f63631c150699faeb4b364e6f808a96 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -16,6 +16,8 @@ #include "executorimpl.h" #include "filter.h" #include "functionMgt.h" +#include "operator.h" +#include "querytask.h" typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c new file mode 100644 index 0000000000000000000000000000000000000000..ed209efed0a2412b68aa67d83130d9f103d45778 --- /dev/null +++ b/source/libs/executor/src/querytask.c @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "os.h" +#include "querynodes.h" +#include "tfill.h" +#include "tname.h" + +#include "tdatablock.h" +#include "tmsg.h" + +#include "executorimpl.h" +#include "index.h" +#include "query.h" +#include "thash.h" +#include "ttypes.h" +#include "vnode.h" +#include "operator.h" +#include "querytask.h" + +#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) + +SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model) { + SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); + if (pTaskInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTaskInfo->cost.created = taosGetTimestampUs(); + + pTaskInfo->execModel = model; + pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); + pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); + + taosInitRWLatch(&pTaskInfo->lock); + + pTaskInfo->id.vgId = vgId; + pTaskInfo->id.queryId = queryId; + pTaskInfo->id.str = taosMemoryMalloc(64); + buildTaskId(taskId, queryId, pTaskInfo->id.str); + + return pTaskInfo; +} + +bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code); } + +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { + pTaskInfo->code = rspCode; + stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str); +} + +void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { + if (status == TASK_NOT_COMPLETED) { + pTaskInfo->status = status; + } else { + // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first + CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED); + pTaskInfo->status |= status; + } +} + +int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, + int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) { + *pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model); + if (*pTaskInfo == NULL) { + taosMemoryFree(sql); + return terrno; + } + + if (pHandle) { + if (pHandle->pStateBackend) { + (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; + } + } + + TSWAP((*pTaskInfo)->sql, sql); + + (*pTaskInfo)->pSubplan = pPlan; + (*pTaskInfo)->pRoot = createOperator(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, + pPlan->user, pPlan->dbFName); + + if (NULL == (*pTaskInfo)->pRoot) { + int32_t code = (*pTaskInfo)->code; + doDestroyTask(*pTaskInfo); + return code; + } else { + return TSDB_CODE_SUCCESS; + } +} + +void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo) { + taosMemoryFreeClear(pSchemaInfo->dbname); + taosMemoryFreeClear(pSchemaInfo->tablename); + tDeleteSSchemaWrapper(pSchemaInfo->sw); + tDeleteSSchemaWrapper(pSchemaInfo->qsw); +} + +int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo) { + SMetaReader mr = {0}; + if (pHandle == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + metaReaderInit(&mr, pHandle->meta, 0); + int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid, + GET_TASKID(pTaskInfo)); + + metaReaderClear(&mr); + return terrno; + } + + SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo; + + pSchemaInfo->tablename = taosStrdup(mr.me.name); + pSchemaInfo->dbname = taosStrdup(dbName); + + if (mr.me.type == TSDB_SUPER_TABLE) { + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; + } else if (mr.me.type == TSDB_CHILD_TABLE) { + tDecoderClear(&mr.coder); + + tb_uid_t suid = mr.me.ctbEntry.suid; + code = metaGetTableEntryByUidCache(&mr, suid); + if (code != TSDB_CODE_SUCCESS) { + metaReaderClear(&mr); + return terrno; + } + + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; + } else { + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); + } + + metaReaderClear(&mr); + + pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode); + return TSDB_CODE_SUCCESS; +} + +SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { + int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols); + int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols); + + SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema)); + + for (int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); + SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + + SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++]; + pSchema->colId = pColNode->colId; + pSchema->type = pColNode->node.resType.type; + pSchema->bytes = pColNode->node.resType.bytes; + tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); + } + + // this the tags and pseudo function columns, we only keep the tag columns + for (int32_t i = 0; i < numOfTags; ++i) { + STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i); + + int32_t type = nodeType(pNode->pExpr); + if (type == QUERY_NODE_COLUMN) { + SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + + SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++]; + pSchema->colId = pColNode->colId; + pSchema->type = pColNode->node.resType.type; + pSchema->bytes = pColNode->node.resType.bytes; + tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); + } + } + + return pqSw; +} + +static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } + +static void freeBlock(void* pParam) { + SSDataBlock* pBlock = *(SSDataBlock**)pParam; + blockDataDestroy(pBlock); +} + +void doDestroyTask(SExecTaskInfo* pTaskInfo) { + qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); + destroyOperator(pTaskInfo->pRoot); + cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo); + cleanupStreamInfo(&pTaskInfo->streamInfo); + + if (!pTaskInfo->localFetch.localExec) { + nodesDestroyNode((SNode*)pTaskInfo->pSubplan); + } + + taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock); + taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo); + taosMemoryFreeClear(pTaskInfo->sql); + taosMemoryFreeClear(pTaskInfo->id.str); + taosMemoryFreeClear(pTaskInfo); +} + +void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst) { + char* p = dst; + + int32_t offset = 6; + memcpy(p, "TID:0x", offset); + offset += tintToHex(taskId, &p[offset]); + + memcpy(&p[offset], " QID:0x", 7); + offset += 7; + offset += tintToHex(queryId, &p[offset]); + + p[offset] = 0; +} diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0f583c021ded63c604c9691e82333486c41ad52b..a717c8b7c06c550e8d7e74164760b9df96b24d38 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -30,6 +30,8 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" +#include "operator.h" +#include "querytask.h" int32_t scanDebug = 0; @@ -2304,7 +2306,7 @@ static void destroyStreamScanOperatorInfo(void* param) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { - destroyOperatorInfo(pStreamScan->pTableScanOp); + destroyOperator(pStreamScan->pTableScanOp); } if (pStreamScan->tqReader) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index cb0f1aa06845575e40d69ff4dca2ccaf96809b0a..0b23a803c6348f417d6c91a204112c6d8d852e0e 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -16,6 +16,8 @@ #include "filter.h" #include "executorimpl.h" #include "tdatablock.h" +#include "operator.h" +#include "querytask.h" typedef struct SSortOperatorInfo { SOptrBasicInfo binfo; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index c78e6002cde589173e111644d5c071ed7d458efe..bc850b5333a789d98d76bc8de2f5d0ce5a22e084 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -31,6 +31,9 @@ #include "thash.h" #include "ttypes.h" #include "vnode.h" +#include "operator.h" +#include "querytask.h" + typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype); typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result); diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index f0e25d8cc5a3786a66720de1e314c406361af11a..a688920a22bb34f7d93a40a7e3e42c52b1aaf875 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -21,6 +21,8 @@ #include "tdatablock.h" #include "tfill.h" #include "ttime.h" +#include "operator.h" +#include "querytask.h" typedef struct STimeSliceOperatorInfo { SSDataBlock* pRes; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ce2d4b27dd2099cca3d88d8973c37784dcdc16df..ab9b0987fae795b6e484899af8b815d0173f15fa 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -21,6 +21,8 @@ #include "tdatablock.h" #include "tfill.h" #include "ttime.h" +#include "operator.h" +#include "querytask.h" #define IS_FINAL_OP(op) ((op)->isFinal) #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); @@ -1606,7 +1608,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); - destroyOperatorInfo(pChildOp); + destroyOperator(pChildOp); } taosArrayDestroy(pInfo->pChildren); } @@ -2835,7 +2837,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); - destroyOperatorInfo(pChild); + destroyOperator(pChild); } taosArrayDestroy(pInfo->pChildren); } @@ -3812,7 +3814,7 @@ void destroyStreamStateOperatorInfo(void* param) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); - destroyOperatorInfo(pChild); + destroyOperator(pChild); } taosArrayDestroy(pInfo->pChildren); } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index b9a696170ae3a1fb4b0c6442bdedef3a3fdd9817..db0678f2142dad234c844a15452af928c2d4b51c 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -29,11 +29,8 @@ #include "taos.h" #include "tdatablock.h" #include "tdef.h" -#include "tglobal.h" -#include "tmsg.h" -#include "tname.h" -#include "trpc.h" #include "tvariant.h" +#include "operator.h" namespace {