提交 a0f3c328 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

...@@ -207,8 +207,8 @@ typedef enum ENodeType { ...@@ -207,8 +207,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_PROJECT,
QUERY_NODE_PHYSICAL_PLAN_JOIN, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN,
QUERY_NODE_PHYSICAL_PLAN_AGG, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_SORT,
...@@ -218,11 +218,11 @@ typedef enum ENodeType { ...@@ -218,11 +218,11 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW, QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,
QUERY_NODE_PHYSICAL_PLAN_PARTITION, QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
......
...@@ -133,12 +133,12 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -133,12 +133,12 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = pPrjNode->node.pChildren; pPhysiChildren = pPrjNode->node.pChildren;
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
pPhysiChildren = pJoinNode->node.pChildren; pPhysiChildren = pJoinNode->node.pChildren;
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_AGG: { case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
pPhysiChildren = pAggNode->node.pChildren; pPhysiChildren = pAggNode->node.pChildren;
break; break;
...@@ -158,12 +158,12 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -158,12 +158,12 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = pIntNode->window.node.pChildren; pPhysiChildren = pIntNode->window.node.pChildren;
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode; SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
pPhysiChildren = pSessNode->window.node.pChildren; pPhysiChildren = pSessNode->window.node.pChildren;
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: {
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode; SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
pPhysiChildren = pStateNode->window.node.pChildren; pPhysiChildren = pStateNode->window.node.pChildren;
break; break;
...@@ -513,7 +513,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -513,7 +513,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType)); EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
...@@ -553,7 +553,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -553,7 +553,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_AGG: { case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT); EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
...@@ -744,7 +744,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -744,7 +744,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode; SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT); EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
...@@ -782,7 +782,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -782,7 +782,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: {
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode; SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_STATE_WINDOW_FORMAT, EXPLAIN_ROW_NEW(level, EXPLAIN_STATE_WINDOW_FORMAT,
......
...@@ -3936,7 +3936,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3936,7 +3936,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -4582,7 +4582,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4582,7 +4582,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit,
pProjPhyNode->node.pConditions, pTaskInfo); pProjPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
...@@ -4668,7 +4668,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4668,7 +4668,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo); pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
...@@ -4680,7 +4680,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4680,7 +4680,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
...@@ -4700,7 +4700,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4700,7 +4700,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo); pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
...@@ -4712,9 +4712,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4712,9 +4712,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode); SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode; SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
...@@ -5106,12 +5106,12 @@ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length) { ...@@ -5106,12 +5106,12 @@ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo) { int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo) {
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
switch (pNode->type) { switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_DELETE: { case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDeleterParam *pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam)); SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
if (NULL == pDeleterParam) { if (NULL == pDeleterParam) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -5122,7 +5122,7 @@ int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pT ...@@ -5122,7 +5122,7 @@ int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pT
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STableKeyInfo *pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i); STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
taosArrayPush(pDeleterParam->pUidList, &pTable->uid); taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
} }
...@@ -5136,7 +5136,6 @@ int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pT ...@@ -5136,7 +5136,6 @@ int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pT
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model) { EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId; uint64_t queryId = pPlan->id.queryId;
......
...@@ -13,20 +13,20 @@ ...@@ -13,20 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "executorimpl.h"
#include "function.h" #include "function.h"
#include "os.h" #include "os.h"
#include "querynodes.h" #include "querynodes.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h"
#include "thash.h" #include "thash.h"
#include "tmsg.h"
#include "ttypes.h" #include "ttypes.h"
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void extractTimeCondition(SJoinOperatorInfo *Info, SLogicConditionNode* pLogicConditionNode); static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
...@@ -39,22 +39,22 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -39,22 +39,22 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator"; pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) {
SOperatorNode* pNode = (SOperatorNode*)pOnCondition; SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
} else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) { } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) {
extractTimeCondition(pInfo, (SLogicConditionNode*) pOnCondition); extractTimeCondition(pInfo, (SLogicConditionNode*)pOnCondition);
} }
pOperator->fpSet = pOperator->fpSet =
...@@ -66,7 +66,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -66,7 +66,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
return pOperator; return pOperator;
_error: _error:
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -180,10 +180,10 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { ...@@ -180,10 +180,10 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
return (pRes->info.rows > 0) ? pRes : NULL; return (pRes->info.rows > 0) ? pRes : NULL;
} }
static void extractTimeCondition(SJoinOperatorInfo *pInfo, SLogicConditionNode* pLogicConditionNode) { static void extractTimeCondition(SJoinOperatorInfo* pInfo, SLogicConditionNode* pLogicConditionNode) {
int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList); int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList);
for(int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i); SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i);
if (nodeType(pNode) == QUERY_NODE_OPERATOR) { if (nodeType(pNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pn1 = (SOperatorNode*)pNode; SOperatorNode* pn1 = (SOperatorNode*)pNode;
......
...@@ -42,10 +42,11 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac ...@@ -42,10 +42,11 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName); const char* dbName);
static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock); static void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
static bool processBlockWithProbability(const SSampleExecInfo *pInfo); SSDataBlock* pBlock);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
bool processBlockWithProbability(const SSampleExecInfo *pInfo) { bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
#if 0 #if 0
if (pInfo->sampleRatio == 1) { if (pInfo->sampleRatio == 1) {
return true; return true;
...@@ -261,7 +262,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -261,7 +262,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) { if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, pBlock); addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
pBlock);
} }
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
...@@ -295,7 +297,8 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction ...@@ -295,7 +297,8 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow); taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
} }
void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) { void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock) {
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (numOfPseudoExpr == 0) { if (numOfPseudoExpr == 0) {
return; return;
...@@ -311,7 +314,7 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_ ...@@ -311,7 +314,7 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_
int32_t dstSlotId = pExpr->base.resSchema.slotId; int32_t dstSlotId = pExpr->base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
colInfoDataCleanup(pColInfoData, pBlock->info.rows); colInfoDataCleanup(pColInfoData, pBlock->info.rows);
...@@ -391,10 +394,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -391,10 +394,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0); recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (groupId) { if (groupId) {
pBlock->info.groupId = *groupId; pBlock->info.groupId = *groupId;
}else if(len != 0){ } else if (len != 0) {
pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len); pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t)); taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
} }
...@@ -483,7 +486,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -483,7 +486,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN; pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); qDebug("%s start to repeat descending order scan data blocks due to query func required",
GET_TASKID(pTaskInfo));
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) { for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
...@@ -525,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -525,7 +529,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
tsdbCleanupReadHandle(pTableScanInfo->dataReader); tsdbCleanupReadHandle(pTableScanInfo->dataReader);
taosArrayDestroy(pTableScanInfo->pGroupCols); taosArrayDestroy(pTableScanInfo->pGroupCols);
for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){ for (int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++) {
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i); SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
taosMemoryFree(key.pData); taosMemoryFree(key.pData);
} }
...@@ -562,28 +566,28 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -562,28 +566,28 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio= pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReader = pDataReader; pInfo->dataReader = pDataReader;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList; pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0; pInfo->curTWinIdx = 0;
pOperator->name = "TableScanOperator"; // for debug purpose pOperator->name = "TableScanOperator"; // for debug purpose
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
// for table group // for table group
pInfo->pGroupCols = groupKyes; pInfo->pGroupCols = groupKyes;
...@@ -604,7 +608,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -604,7 +608,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
_error: _error:
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
...@@ -723,16 +727,19 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { ...@@ -723,16 +727,19 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
} }
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW; return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
} }
static bool isStateWindow(SStreamBlockScanInfo* pInfo) { static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW; return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
} }
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes; SSDataBlock* pSDB = pInfo->pUpdateRes;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX,}; STimeWindow win = {
.skey = INT64_MIN,
.ekey = INT64_MAX,
};
bool needRead = false; bool needRead = false;
if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) { if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, pInfo->primaryTsIndex);
...@@ -759,7 +766,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -759,7 +766,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow; SArray* pWins = pInfo->sessionSup.pStreamAggSup->pScanWindow;
int32_t size = taosArrayGetSize(pWins); int32_t size = taosArrayGetSize(pWins);
if (pInfo->scanWinIndex < size) { if (pInfo->scanWinIndex < size) {
win = *(STimeWindow *)taosArrayGet(pWins, pInfo->scanWinIndex); win = *(STimeWindow*)taosArrayGet(pWins, pInfo->scanWinIndex);
pInfo->scanWinIndex++; pInfo->scanWinIndex++;
needRead = true; needRead = true;
} else { } else {
...@@ -790,11 +797,11 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) { ...@@ -790,11 +797,11 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
return pResult; return pResult;
} }
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { SSDataBlock* pUpdateBlock) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData; TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[i])) { if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[i])) {
taosArrayPush(pInfo->tsArray, ts + i); taosArrayPush(pInfo->tsArray, ts + i);
...@@ -859,8 +866,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -859,8 +866,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} else { } else {
if (isStateWindow(pInfo) && if (isStateWindow(pInfo) && taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
prepareDataScan(pInfo); prepareDataScan(pInfo);
...@@ -973,9 +979,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -973,9 +979,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
} }
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup) { STimeWindowAggSupp* pTwSup) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -986,12 +992,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan ...@@ -986,12 +992,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo); SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
int32_t numOfCols = 0; int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); pInfo->pColMatchInfo =
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo); int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo);
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
...@@ -1024,8 +1031,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan ...@@ -1024,8 +1031,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
} }
if (isSmaStream(pTableScanNode->triggerType)) { if (isSmaStream(pTableScanNode->triggerType)) {
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval, pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval, pTableScanNode->filesFactor);
pTableScanNode->filesFactor);
} }
if (pSTInfo->interval.interval > 0 && pDataReader) { if (pSTInfo->interval.interval > 0 && pDataReader) {
...@@ -1039,28 +1045,27 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan ...@@ -1039,28 +1045,27 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
} }
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
pInfo->streamBlockReader = pHandle->reader; pInfo->streamBlockReader = pHandle->reader;
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createResDataBlock(pDescNode); pInfo->pUpdateRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->pDataReader = pDataReader; pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pTableScanDummy; pInfo->pOperatorDumy = pTableScanDummy;
pInfo->interval = pSTInfo->interval; pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pOperator->name = "StreamBlockScanOperator";
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pRes->info.numOfCols; pOperator->numOfExprs = pInfo->pRes->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, pOperator->fpSet =
NULL, operatorDummyCloseFn, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator; return pOperator;
...@@ -1445,8 +1450,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -1445,8 +1450,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
} }
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen,
pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
// todo log the filter info // todo log the filter info
doFilterResult(pInfo); doFilterResult(pInfo);
...@@ -1519,7 +1524,8 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT ...@@ -1519,7 +1524,8 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
return numOfRows; return numOfRows;
} }
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode,
SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo)); SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1529,16 +1535,16 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1529,16 +1535,16 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
SScanPhysiNode* pScanNode = &pScanPhyNode->scan; SScanPhysiNode* pScanNode = &pScanPhyNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode); SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t num = 0; int32_t num = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID); SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID);
pInfo->accountId = pScanPhyNode->accountId; pInfo->accountId = pScanPhyNode->accountId;
pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->pCondition = pScanNode->node.pConditions; pInfo->pCondition = pScanNode->node.pConditions;
pInfo->scanCols = colList; pInfo->scanCols = colList;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
...@@ -1554,20 +1560,20 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -1554,20 +1560,20 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->readHandle = *(SReadHandle*)readHandle; pInfo->readHandle = *(SReadHandle*)readHandle;
} }
pOperator->name = "SysTableScanOperator"; pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
return pOperator; return pOperator;
_error: _error:
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
...@@ -1687,16 +1693,16 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1687,16 +1693,16 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
val.cid = pExprInfo[j].base.pParam[0].pCol->colId; val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val); const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);
char *data = NULL; char* data = NULL;
if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL){ if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
data = tTagValToData((const STagVal *)p, false); data = tTagValToData((const STagVal*)p, false);
}else { } else {
data = (char*)p; data = (char*)p;
} }
colDataAppend(pDst, count, data, (data == NULL)); colDataAppend(pDst, count, data, (data == NULL));
if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
&& IS_VAR_DATA_TYPE(((const STagVal *)p)->type) && data != NULL){ data != NULL) {
taosMemoryFree(data); taosMemoryFree(data);
} }
} }
...@@ -1726,7 +1732,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1726,7 +1732,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1741,19 +1748,20 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -1741,19 +1748,20 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
int32_t num = 0; int32_t num = 0;
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID); SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID);
pInfo->pTableList = pTableListInfo; pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList; pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);; pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle; ;
pInfo->curPos = 0; pInfo->readHandle = *pReadHandle;
pOperator->name = "TagScanOperator"; pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfExprs; pOperator->numOfExprs = numOfExprs;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
......
...@@ -1790,7 +1790,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -1790,7 +1790,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pInfo->tsSlotId = tsSlotId; pInfo->tsSlotId = tsSlotId;
pOperator->name = "StateWindowOperator"; pOperator->name = "StateWindowOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
...@@ -1842,7 +1842,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -1842,7 +1842,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo->winSup.prevTs = INT64_MIN; pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
...@@ -2270,7 +2270,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx ...@@ -2270,7 +2270,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pOperator->name = "StreamSessionWindowAggOperator"; pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
...@@ -2757,7 +2757,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -2757,7 +2757,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
goto _error; goto _error;
} }
pOperator->name = "StreamFinalSessionWindowAggOperator"; pOperator->name = "StreamFinalSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
int32_t numOfChild = 1; // Todo(liuyao) get it from phy plan int32_t numOfChild = 1; // Todo(liuyao) get it from phy plan
pInfo = pOperator->info; pInfo = pOperator->info;
pInfo->pChildren = taosArrayInit(8, sizeof(void*)); pInfo->pChildren = taosArrayInit(8, sizeof(void*));
...@@ -3148,7 +3148,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3148,7 +3148,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pOperator->name = "StreamStateAggOperator"; pOperator->name = "StreamStateAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
......
...@@ -456,6 +456,49 @@ static SNode* physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) { ...@@ -456,6 +456,49 @@ static SNode* physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
return (SNode*)pDst; return (SNode*)pDst;
} }
static SNode* physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
CLONE_NODE_LIST_FIELD(pScanCols);
CLONE_NODE_LIST_FIELD(pScanPseudoCols);
COPY_SCALAR_FIELD(uid);
COPY_SCALAR_FIELD(suid);
COPY_SCALAR_FIELD(tableType);
COPY_OBJECT_FIELD(tableName, sizeof(SName));
return (SNode*)pDst;
}
static SNode* physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) {
return physiScanCopy(pSrc, pDst);
}
static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(scan, physiScanCopy);
COPY_OBJECT_FIELD(scanSeq[0], sizeof(uint8_t) * 2);
COPY_OBJECT_FIELD(scanRange, sizeof(STimeWindow));
COPY_SCALAR_FIELD(ratio);
COPY_SCALAR_FIELD(dataRequired);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
CLONE_NODE_LIST_FIELD(pPartitionKeys);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
COPY_SCALAR_FIELD(intervalUnit);
COPY_SCALAR_FIELD(slidingUnit);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
return (SNode*)pDst;
}
static SNode* physiSysTableScanCopy(const SSystemTableScanPhysiNode* pSrc, SSystemTableScanPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(scan, physiScanCopy);
COPY_OBJECT_FIELD(mgmtEpSet, sizeof(SEpSet));
COPY_SCALAR_FIELD(showRewrite);
COPY_SCALAR_FIELD(accountId);
return (SNode*)pDst;
}
static SNode* physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pDst) { static SNode* physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy); COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
CLONE_NODE_LIST_FIELD(pExprs); CLONE_NODE_LIST_FIELD(pExprs);
...@@ -603,6 +646,14 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { ...@@ -603,6 +646,14 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicIndefRowsFuncCopy((const SIndefRowsFuncLogicNode*)pNode, (SIndefRowsFuncLogicNode*)pDst); return logicIndefRowsFuncCopy((const SIndefRowsFuncLogicNode*)pNode, (SIndefRowsFuncLogicNode*)pDst);
case QUERY_NODE_LOGIC_SUBPLAN: case QUERY_NODE_LOGIC_SUBPLAN:
return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst); return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return physiTagScanCopy((const STagScanPhysiNode*)pNode, (STagScanPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return physiTableScanCopy((const STableScanPhysiNode*)pNode, (STableScanPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
return physiSysTableScanCopy((const SSystemTableScanPhysiNode*)pNode, (SSystemTableScanPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
......
...@@ -220,9 +220,9 @@ const char* nodesNodeName(ENodeType type) { ...@@ -220,9 +220,9 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiSystemTableScan"; return "PhysiSystemTableScan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject"; return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_JOIN: case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return "PhysiJoin"; return "PhysiJoin";
case QUERY_NODE_PHYSICAL_PLAN_AGG: case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return "PhysiAgg"; return "PhysiAgg";
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return "PhysiExchange"; return "PhysiExchange";
...@@ -242,13 +242,13 @@ const char* nodesNodeName(ENodeType type) { ...@@ -242,13 +242,13 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiStreamSemiInterval"; return "PhysiStreamSemiInterval";
case QUERY_NODE_PHYSICAL_PLAN_FILL: case QUERY_NODE_PHYSICAL_PLAN_FILL:
return "PhysiFill"; return "PhysiFill";
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
return "PhysiSessionWindow"; return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "PhysiStreamSessionWindow"; return "PhysiStreamSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return "PhysiStateWindow"; return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return "PhysiStreamStateWindow"; return "PhysiStreamStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return "PhysiPartition"; return "PhysiPartition";
...@@ -3882,9 +3882,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -3882,9 +3882,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiSysTableScanNodeToJson(pObj, pJson); return physiSysTableScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return physiProjectNodeToJson(pObj, pJson); return physiProjectNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_JOIN: case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return physiJoinNodeToJson(pObj, pJson); return physiJoinNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_AGG: case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return physiAggNodeToJson(pObj, pJson); return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return physiExchangeNodeToJson(pObj, pJson); return physiExchangeNodeToJson(pObj, pJson);
...@@ -3900,11 +3900,11 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -3900,11 +3900,11 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiIntervalNodeToJson(pObj, pJson); return physiIntervalNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_FILL: case QUERY_NODE_PHYSICAL_PLAN_FILL:
return physiFillNodeToJson(pObj, pJson); return physiFillNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return physiSessionWindowNodeToJson(pObj, pJson); return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return physiStateWindowNodeToJson(pObj, pJson); return physiStateWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson); return physiPartitionNodeToJson(pObj, pJson);
...@@ -4015,9 +4015,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -4015,9 +4015,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiSysTableScanNode(pJson, pObj); return jsonToPhysiSysTableScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return jsonToPhysiProjectNode(pJson, pObj); return jsonToPhysiProjectNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_JOIN: case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return jsonToPhysiJoinNode(pJson, pObj); return jsonToPhysiJoinNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_AGG: case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return jsonToPhysiAggNode(pJson, pObj); return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return jsonToPhysiExchangeNode(pJson, pObj); return jsonToPhysiExchangeNode(pJson, pObj);
...@@ -4033,11 +4033,11 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -4033,11 +4033,11 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiIntervalNode(pJson, pObj); return jsonToPhysiIntervalNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_FILL: case QUERY_NODE_PHYSICAL_PLAN_FILL:
return jsonToPhysiFillNode(pJson, pObj); return jsonToPhysiFillNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return jsonToPhysiSessionWindowNode(pJson, pObj); return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return jsonToPhysiStateWindowNode(pJson, pObj); return jsonToPhysiStateWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj); return jsonToPhysiPartitionNode(pJson, pObj);
......
...@@ -467,7 +467,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk ...@@ -467,7 +467,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
...@@ -478,7 +478,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk ...@@ -478,7 +478,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_AGG: { case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode* pAgg = (SAggPhysiNode*)pNode; SAggPhysiNode* pAgg = (SAggPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
...@@ -518,12 +518,12 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk ...@@ -518,12 +518,12 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: { case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode; SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode;
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
......
...@@ -255,9 +255,9 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -255,9 +255,9 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSystemTableScanPhysiNode)); return makeNode(type, sizeof(SSystemTableScanPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode)); return makeNode(type, sizeof(SProjectPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_JOIN: case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
return makeNode(type, sizeof(SJoinPhysiNode)); return makeNode(type, sizeof(SJoinPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_AGG: case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
return makeNode(type, sizeof(SAggPhysiNode)); return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode)); return makeNode(type, sizeof(SExchangePhysiNode));
...@@ -277,13 +277,13 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -277,13 +277,13 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStreamSemiIntervalPhysiNode)); return makeNode(type, sizeof(SStreamSemiIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_FILL: case QUERY_NODE_PHYSICAL_PLAN_FILL:
return makeNode(type, sizeof(SFillPhysiNode)); return makeNode(type, sizeof(SFillPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
return makeNode(type, sizeof(SSessionWinodwPhysiNode)); return makeNode(type, sizeof(SSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode)); return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return makeNode(type, sizeof(SStateWinodwPhysiNode)); return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return makeNode(type, sizeof(SStreamStateWinodwPhysiNode)); return makeNode(type, sizeof(SStreamStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode)); return makeNode(type, sizeof(SPartitionPhysiNode));
...@@ -657,14 +657,14 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -657,14 +657,14 @@ void nodesDestroyNode(SNodeptr pNode) {
nodesDestroyList(pPhyNode->pProjections); nodesDestroyList(pPhyNode->pProjections);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_JOIN: { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode); destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyNode(pPhyNode->pOnConditions);
nodesDestroyList(pPhyNode->pTargets); nodesDestroyList(pPhyNode->pTargets);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_AGG: { case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode* pPhyNode = (SAggPhysiNode*)pNode; SAggPhysiNode* pPhyNode = (SAggPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode); destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs); nodesDestroyList(pPhyNode->pExprs);
...@@ -689,8 +689,8 @@ void nodesDestroyNode(SNodeptr pNode) { ...@@ -689,8 +689,8 @@ void nodesDestroyNode(SNodeptr pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
......
...@@ -556,7 +556,7 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, ...@@ -556,7 +556,7 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
SJoinPhysiNode* pJoin = SJoinPhysiNode* pJoin =
(SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN); (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
if (NULL == pJoin) { if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -738,7 +738,8 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi ...@@ -738,7 +738,8 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
SPhysiNode** pPhyNode) { SPhysiNode** pPhyNode) {
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG); SAggPhysiNode* pAgg =
(SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
if (NULL == pAgg) { if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -996,8 +997,7 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* ...@@ -996,8 +997,7 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode( SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode, pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION : QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION));
: QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
if (NULL == pSession) { if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -1009,10 +1009,9 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* ...@@ -1009,10 +1009,9 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SStateWinodwPhysiNode* pState = SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
(SStateWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE : QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE));
: QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW));
if (NULL == pState) { if (NULL == pState) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -340,7 +340,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent ...@@ -340,7 +340,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return code; return code;
} }
static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -363,7 +363,7 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn ...@@ -363,7 +363,7 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn
return code; return code;
} }
static int32_t stbSplSplitWindowNodeForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -379,14 +379,30 @@ static int32_t stbSplSplitWindowNodeForStream(SSplitContext* pCxt, SStableSplitI ...@@ -379,14 +379,30 @@ static int32_t stbSplSplitWindowNodeForStream(SSplitContext* pCxt, SStableSplitI
return code; return code;
} }
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitWindowNodeForStream(pCxt, pInfo); return stbSplSplitIntervalForStream(pCxt, pInfo);
} else { } else {
return stbSplSplitWindowNodeForBatch(pCxt, pInfo); return stbSplSplitIntervalForBatch(pCxt, pInfo);
} }
} }
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
case WINDOW_TYPE_INTERVAL:
return stbSplSplitInterval(pCxt, pInfo);
case WINDOW_TYPE_SESSION:
return stbSplSplitSession(pCxt, pInfo);
default:
break;
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) { static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
SNodeList* pFunc = pMergeAgg->pAggFuncs; SNodeList* pFunc = pMergeAgg->pAggFuncs;
pMergeAgg->pAggFuncs = NULL; pMergeAgg->pAggFuncs = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册