Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e033ad14
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e033ad14
编写于
6月 06, 2022
作者:
X
Xiaoyu Wang
提交者:
GitHub
6月 06, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13520 from taosdata/feature/3.0_wxy
feat: order by distributed split
上级
55ea9d5f
2a1ec3c3
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
234 addition
and
101 deletion
+234
-101
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+8
-3
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+19
-20
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+3
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+28
-3
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+2
-2
source/libs/parser/test/parInitialDTest.cpp
source/libs/parser/test/parInitialDTest.cpp
+1
-1
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+3
-3
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+32
-11
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+133
-55
source/libs/planner/test/planOrderByTest.cpp
source/libs/planner/test/planOrderByTest.cpp
+3
-0
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
e033ad14
...
...
@@ -211,7 +211,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_
SORT_
MERGE_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
e033ad14
...
...
@@ -42,6 +42,7 @@ typedef struct SScanLogicNode {
SNodeList
*
pScanPseudoCols
;
int8_t
tableType
;
uint64_t
tableId
;
uint64_t
stableId
;
SVgroupsInfo
*
pVgroupList
;
EScanType
scanType
;
uint8_t
scanSeq
[
2
];
// first is scan count, and second is reverse scan count
...
...
@@ -94,7 +95,7 @@ typedef struct SVnodeModifyLogicNode {
int32_t
msgType
;
SArray
*
pDataBlocks
;
SVgDataBlocks
*
pVgDataBlocks
;
SNode
*
p
Modify
Rows
;
// SColumnNode
SNode
*
p
Affected
Rows
;
// SColumnNode
uint64_t
tableId
;
int8_t
tableType
;
// table type
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
...
...
@@ -109,6 +110,7 @@ typedef struct SExchangeLogicNode {
typedef
struct
SMergeLogicNode
{
SLogicNode
node
;
SNodeList
*
pMergeKeys
;
SNodeList
*
pInputs
;
int32_t
numOfChannels
;
int32_t
srcGroupId
;
}
SMergeLogicNode
;
...
...
@@ -117,7 +119,7 @@ typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW
typedef
enum
EIntervalAlgorithm
{
INTERVAL_ALGO_HASH
=
1
,
INTERVAL_ALGO_
SORT_
MERGE
,
INTERVAL_ALGO_MERGE
,
INTERVAL_ALGO_STREAM_FINAL
,
INTERVAL_ALGO_STREAM_SEMI
,
INTERVAL_ALGO_STREAM_SINGLE
,
...
...
@@ -220,6 +222,7 @@ typedef struct SScanPhysiNode {
SNodeList
*
pScanCols
;
SNodeList
*
pScanPseudoCols
;
uint64_t
uid
;
// unique id of the table
uint64_t
suid
;
int8_t
tableType
;
SName
tableName
;
}
SScanPhysiNode
;
...
...
@@ -296,6 +299,7 @@ typedef struct SExchangePhysiNode {
typedef
struct
SMergePhysiNode
{
SPhysiNode
node
;
SNodeList
*
pMergeKeys
;
SNodeList
*
pTargets
;
int32_t
numOfChannels
;
int32_t
srcGroupId
;
}
SMergePhysiNode
;
...
...
@@ -319,7 +323,7 @@ typedef struct SIntervalPhysiNode {
int8_t
slidingUnit
;
}
SIntervalPhysiNode
;
typedef
SIntervalPhysiNode
S
Sort
MergeIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SMergeIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamFinalIntervalPhysiNode
;
typedef
SIntervalPhysiNode
SStreamSemiIntervalPhysiNode
;
...
...
@@ -388,6 +392,7 @@ typedef struct SDataDeleterNode {
int8_t
tableType
;
// table type
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
STimeWindow
deleteTimeRange
;
SNode
*
pAffectedRows
;
}
SDataDeleterNode
;
typedef
struct
SSubplan
{
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
e033ad14
...
...
@@ -91,7 +91,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
}
SHashObj
*
pColHash
=
NULL
;
SNodeList
*
pNodeList
;
SNodeList
*
pNodeList
=
NULL
;
nodesCollectColumns
((
SSelectStmt
*
)
pAst
,
SQL_CLAUSE_FROM
,
NULL
,
COLLECT_COL_TYPE_ALL
,
&
pNodeList
);
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pNodeList
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
e033ad14
...
...
@@ -2522,8 +2522,8 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
}
int32_t
setDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
SLoadRemoteDataInfo
*
pLoadInfo
,
int32_t
numOfRows
,
char
*
pData
,
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
)
{
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
)
{
if
(
pColList
==
NULL
)
{
// data from other sources
blockCompressDecode
(
pRes
,
numOfOutput
,
numOfRows
,
pData
);
}
else
{
// extract data according to pColList
...
...
@@ -2677,7 +2677,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SRetrieveTableRsp
*
pTableRsp
=
pDataInfo
->
pRsp
;
code
=
setDataBlockFromFetchRsp
(
pExchangeInfo
->
pResult
,
pLoadInfo
,
pTableRsp
->
numOfRows
,
pTableRsp
->
data
,
pTableRsp
->
compLen
,
pTableRsp
->
numOfCols
,
startTs
,
&
pDataInfo
->
totalRows
,
NULL
);
pTableRsp
->
compLen
,
pTableRsp
->
numOfCols
,
startTs
,
&
pDataInfo
->
totalRows
,
NULL
);
if
(
code
!=
0
)
{
taosMemoryFreeClear
(
pDataInfo
->
pRsp
);
goto
_error
;
...
...
@@ -2790,7 +2790,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp
*
pTableRsp
=
pDataInfo
->
pRsp
;
int32_t
code
=
setDataBlockFromFetchRsp
(
pExchangeInfo
->
pResult
,
pLoadInfo
,
pTableRsp
->
numOfRows
,
pTableRsp
->
data
,
pTableRsp
->
compLen
,
pTableRsp
->
numOfCols
,
startTs
,
&
pDataInfo
->
totalRows
,
NULL
);
pTableRsp
->
compLen
,
pTableRsp
->
numOfCols
,
startTs
,
&
pDataInfo
->
totalRows
,
NULL
);
if
(
pRsp
->
completed
==
1
)
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, rowsOfSource:%"
PRIu64
...
...
@@ -2856,7 +2856,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
return
seqLoadRemoteData
(
pOperator
);
}
else
{
return
concurrentlyLoadRemoteDataImpl
(
pOperator
,
pExchangeInfo
,
pTaskInfo
);
// return concurrentlyLoadRemoteData(pOperator);
// return concurrentlyLoadRemoteData(pOperator);
}
}
...
...
@@ -2911,18 +2911,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto
_error
;
}
pInfo
->
seqLoadData
=
false
;
pInfo
->
seqLoadData
=
false
;
pInfo
->
pTransporter
=
pTransporter
;
pInfo
->
pResult
=
createResDataBlock
(
pExNode
->
node
.
pOutputDataBlockDesc
);
pInfo
->
pResult
=
createResDataBlock
(
pExNode
->
node
.
pOutputDataBlockDesc
);
tsem_init
(
&
pInfo
->
ready
,
0
,
0
);
pOperator
->
name
=
"ExchangeOperator"
;
pOperator
->
name
=
"ExchangeOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfExprs
=
pInfo
->
pResult
->
info
.
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfExprs
=
pInfo
->
pResult
->
info
.
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
prepareLoadRemoteData
,
doLoadRemoteData
,
NULL
,
NULL
,
destroyExchangeOperatorInfo
,
NULL
,
NULL
,
NULL
);
...
...
@@ -4389,7 +4389,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
// simple child table.
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
STimeWindowAggSupp
twSup
=
{
.
waterMark
=
pTableScanNode
->
watermark
,
.
calTrigger
=
pTableScanNode
->
triggerType
,
.
maxTs
=
INT64_MIN
};
.
waterMark
=
pTableScanNode
->
watermark
,
.
calTrigger
=
pTableScanNode
->
triggerType
,
.
maxTs
=
INT64_MIN
};
tsdbReaderT
pDataReader
=
NULL
;
if
(
pHandle
->
vnode
)
{
pDataReader
=
doCreateDataReader
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
(
uint64_t
)
queryId
,
taskId
,
pTagCond
);
...
...
@@ -4517,17 +4517,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
extractColMatchInfo
(
pSortPhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_SLOT_ID
);
pOptr
=
createSortOperatorInfo
(
ops
[
0
],
pResBlock
,
info
,
pExprInfo
,
numOfCols
,
pColList
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE
==
type
)
{
SMergePhysiNode
*
pMergePhyNode
=
(
SMergePhysiNode
*
)
pPhyNode
;
SDataBlockDescNode
*
pDescNode
=
pPhyNode
->
pOutputDataBlockDesc
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
SArray
*
sortInfo
=
createSortInfo
(
pMergePhyNode
->
pMergeKeys
);
SArray
*
sortInfo
=
createSortInfo
(
pMergePhyNode
->
pMergeKeys
);
int32_t
numOfOutputCols
=
0
;
SArray
*
pColList
=
NULL
;
//extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
SArray
*
pColList
=
extractColMatchInfo
(
pMergePhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_SLOT_ID
);
pOptr
=
createMultiwaySortMergeOperatorInfo
(
ops
,
size
,
pResBlock
,
sortInfo
,
pColList
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
==
type
)
{
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
e033ad14
...
...
@@ -318,6 +318,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_LIST_FIELD
(
pScanPseudoCols
);
COPY_SCALAR_FIELD
(
tableType
);
COPY_SCALAR_FIELD
(
tableId
);
COPY_SCALAR_FIELD
(
stableId
);
CLONE_OBJECT_FIELD
(
pVgroupList
,
vgroupsInfoClone
);
COPY_SCALAR_FIELD
(
scanType
);
COPY_OBJECT_FIELD
(
scanSeq
[
0
],
sizeof
(
uint8_t
)
*
2
);
...
...
@@ -370,7 +371,7 @@ static SNode* logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModif
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
COPY_SCALAR_FIELD
(
modifyType
);
COPY_SCALAR_FIELD
(
msgType
);
CLONE_NODE_FIELD
(
p
Modify
Rows
);
CLONE_NODE_FIELD
(
p
Affected
Rows
);
COPY_SCALAR_FIELD
(
tableId
);
COPY_SCALAR_FIELD
(
tableType
);
COPY_CHAR_ARRAY_FIELD
(
tableFName
);
...
...
@@ -387,6 +388,7 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
static
SNode
*
logicMergeCopy
(
const
SMergeLogicNode
*
pSrc
,
SMergeLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
CLONE_NODE_LIST_FIELD
(
pMergeKeys
);
CLONE_NODE_LIST_FIELD
(
pInputs
);
COPY_SCALAR_FIELD
(
numOfChannels
);
COPY_SCALAR_FIELD
(
srcGroupId
);
return
(
SNode
*
)
pDst
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
e033ad14
...
...
@@ -230,6 +230,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiSort"
;
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
return
"PhysiHashInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
return
"PhysiMergeInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
"PhysiStreamInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
...
...
@@ -611,7 +613,7 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
static
const
char
*
jkVnodeModifyLogicPlanModifyType
=
"ModifyType"
;
static
const
char
*
jkVnodeModifyLogicPlanMsgType
=
"MsgType"
;
static
const
char
*
jkVnodeModifyLogicPlan
ModifyRows
=
"Modify
Rows"
;
static
const
char
*
jkVnodeModifyLogicPlan
AffectedRows
=
"Affected
Rows"
;
static
int32_t
logicVnodeModifyNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SVnodeModifyLogicNode
*
pNode
=
(
const
SVnodeModifyLogicNode
*
)
pObj
;
...
...
@@ -624,7 +626,7 @@ static int32_t logicVnodeModifyNodeToJson(const void* pObj, SJson* pJson) {
code
=
tjsonAddIntegerToObject
(
pJson
,
jkVnodeModifyLogicPlanMsgType
,
pNode
->
msgType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkVnodeModifyLogicPlan
ModifyRows
,
nodeToJson
,
pNode
->
pModify
Rows
);
code
=
tjsonAddObject
(
pJson
,
jkVnodeModifyLogicPlan
AffectedRows
,
nodeToJson
,
pNode
->
pAffected
Rows
);
}
return
code
;
...
...
@@ -641,7 +643,7 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) {
code
=
tjsonGetIntValue
(
pJson
,
jkVnodeModifyLogicPlanMsgType
,
&
pNode
->
msgType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkVnodeModifyLogicPlan
ModifyRows
,
&
pNode
->
pModify
Rows
);
code
=
jsonToNodeObject
(
pJson
,
jkVnodeModifyLogicPlan
AffectedRows
,
&
pNode
->
pAffected
Rows
);
}
return
code
;
...
...
@@ -1249,6 +1251,7 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
static
const
char
*
jkScanPhysiPlanScanCols
=
"ScanCols"
;
static
const
char
*
jkScanPhysiPlanScanPseudoCols
=
"ScanPseudoCols"
;
static
const
char
*
jkScanPhysiPlanTableId
=
"TableId"
;
static
const
char
*
jkScanPhysiPlanSTableId
=
"STableId"
;
static
const
char
*
jkScanPhysiPlanTableType
=
"TableType"
;
static
const
char
*
jkScanPhysiPlanTableName
=
"TableName"
;
...
...
@@ -1265,6 +1268,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanTableId
,
pNode
->
uid
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanSTableId
,
pNode
->
suid
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanTableType
,
pNode
->
tableType
);
}
...
...
@@ -1288,6 +1294,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUBigIntValue
(
pJson
,
jkScanPhysiPlanTableId
,
&
pNode
->
uid
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUBigIntValue
(
pJson
,
jkScanPhysiPlanSTableId
,
&
pNode
->
suid
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkScanPhysiPlanTableType
,
&
pNode
->
tableType
);
}
...
...
@@ -1644,6 +1653,7 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
}
static
const
char
*
jkMergePhysiPlanMergeKeys
=
"MergeKeys"
;
static
const
char
*
jkMergePhysiPlanTargets
=
"Targets"
;
static
const
char
*
jkMergePhysiPlanNumOfChannels
=
"NumOfChannels"
;
static
const
char
*
jkMergePhysiPlanSrcGroupId
=
"SrcGroupId"
;
...
...
@@ -1654,6 +1664,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkMergePhysiPlanMergeKeys
,
pNode
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkMergePhysiPlanTargets
,
pNode
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkMergePhysiPlanNumOfChannels
,
pNode
->
numOfChannels
);
}
...
...
@@ -1671,6 +1684,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkMergePhysiPlanMergeKeys
,
&
pNode
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkMergePhysiPlanTargets
,
&
pNode
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkMergePhysiPlanNumOfChannels
,
&
pNode
->
numOfChannels
);
}
...
...
@@ -2000,6 +2016,7 @@ static const char* jkDeletePhysiPlanTableType = "TableType";
static
const
char
*
jkDeletePhysiPlanTableFName
=
"TableFName"
;
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeStartKey
=
"DeleteTimeRangeStartKey"
;
static
const
char
*
jkDeletePhysiPlanDeleteTimeRangeEndKey
=
"DeleteTimeRangeEndKey"
;
static
const
char
*
jkDeletePhysiPlanAffectedRows
=
"AffectedRows"
;
static
int32_t
physiDeleteNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SDataDeleterNode
*
pNode
=
(
const
SDataDeleterNode
*
)
pObj
;
...
...
@@ -2020,6 +2037,9 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeEndKey
,
pNode
->
deleteTimeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkDeletePhysiPlanAffectedRows
,
nodeToJson
,
pNode
->
pAffectedRows
);
}
return
code
;
}
...
...
@@ -2043,6 +2063,9 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkDeletePhysiPlanDeleteTimeRangeEndKey
,
&
pNode
->
deleteTimeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkDeletePhysiPlanAffectedRows
,
&
pNode
->
pAffectedRows
);
}
return
code
;
}
...
...
@@ -3802,6 +3825,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
physiSortNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
...
@@ -3930,6 +3954,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
jsonToPhysiSortNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
e033ad14
...
...
@@ -260,8 +260,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SSortPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_
SORT_
MERGE_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
S
Sort
MergeIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SMergeIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SStreamIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
...
...
source/libs/parser/test/parInitialDTest.cpp
浏览文件 @
e033ad14
...
...
@@ -51,7 +51,7 @@ TEST_F(ParserInitialDTest, dropBnode) {
}
// DROP CONSUMER GROUP [ IF EXISTS ] cgroup_name ON topic_name
TEST_F
(
ParserInitialDTest
,
dropCGroup
)
{
TEST_F
(
ParserInitialDTest
,
dropC
onsumer
Group
)
{
useDb
(
"root"
,
"test"
);
SMDropCgroupReq
expect
=
{
0
};
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
e033ad14
...
...
@@ -219,9 +219,9 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
return
TSDB_CODE_OUT_OF_MEMORY
;
}
// TSWAP(pScan->pMeta, pRealTable->pMeta);
TSWAP
(
pScan
->
pVgroupList
,
pRealTable
->
pVgroupList
);
pScan
->
tableId
=
pRealTable
->
pMeta
->
uid
;
pScan
->
stableId
=
pRealTable
->
pMeta
->
suid
;
pScan
->
tableType
=
pRealTable
->
pMeta
->
tableType
;
pScan
->
scanSeq
[
0
]
=
hasRepeatScanFuncs
?
2
:
1
;
pScan
->
scanSeq
[
1
]
=
0
;
...
...
@@ -1057,8 +1057,8 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
snprintf
(
pModify
->
tableFName
,
sizeof
(
pModify
->
tableFName
),
"%d.%s.%s"
,
pCxt
->
pPlanCxt
->
acctId
,
pRealTable
->
table
.
dbName
,
pRealTable
->
table
.
tableName
);
pModify
->
deleteTimeRange
=
pDelete
->
timeRange
;
pModify
->
p
Modify
Rows
=
nodesCloneNode
(
pDelete
->
pCountFunc
);
if
(
NULL
==
pModify
->
p
Modify
Rows
)
{
pModify
->
p
Affected
Rows
=
nodesCloneNode
(
pDelete
->
pCountFunc
);
if
(
NULL
==
pModify
->
p
Affected
Rows
)
{
nodesDestroyNode
(
pModify
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
e033ad14
...
...
@@ -425,6 +425,7 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScanPhysiNode
->
uid
=
pScanLogicNode
->
tableId
;
pScanPhysiNode
->
suid
=
pScanLogicNode
->
stableId
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
tableType
;
memcpy
(
&
pScanPhysiNode
->
tableName
,
&
pScanLogicNode
->
tableName
,
sizeof
(
SName
));
if
(
NULL
!=
pScanLogicNode
->
pTagCond
)
{
...
...
@@ -923,8 +924,8 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
switch
(
intervalAlgo
)
{
case
INTERVAL_ALGO_HASH
:
return
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
;
case
INTERVAL_ALGO_
SORT_
MERGE
:
return
QUERY_NODE_PHYSICAL_PLAN_
SORT_
MERGE_INTERVAL
;
case
INTERVAL_ALGO_MERGE
:
return
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
;
case
INTERVAL_ALGO_STREAM_FINAL
:
return
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
;
case
INTERVAL_ALGO_STREAM_SEMI
:
...
...
@@ -1155,6 +1156,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
nodesDestroyNode
(
pExchange
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SNode
*
pSlot
=
NULL
;
FOREACH
(
pSlot
,
pExchange
->
node
.
pOutputDataBlockDesc
->
pSlots
)
{
((
SSlotDescNode
*
)
pSlot
)
->
output
=
true
;
}
return
nodesListMakeStrictAppend
(
&
pMerge
->
node
.
pChildren
,
pExchange
);
}
...
...
@@ -1168,12 +1171,14 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
pMerge
->
numOfChannels
=
pMergeLogicNode
->
numOfChannels
;
pMerge
->
srcGroupId
=
pMergeLogicNode
->
srcGroupId
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
addDataBlockSlots
(
pCxt
,
pMergeLogicNode
->
pInputs
,
pMerge
->
node
.
pOutputDataBlockDesc
)
;
for
(
int32_t
i
=
0
;
i
<
pMerge
->
numOfChannels
;
++
i
)
{
code
=
createExchangePhysiNodeByMerge
(
pMerge
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
for
(
int32_t
i
=
0
;
i
<
pMerge
->
numOfChannels
;
++
i
)
{
code
=
createExchangePhysiNodeByMerge
(
pMerge
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
}
...
...
@@ -1182,6 +1187,14 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
&
pMerge
->
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setListSlotId
(
pCxt
,
pMerge
->
node
.
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pMergeLogicNode
->
node
.
pTargets
,
&
pMerge
->
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockSlots
(
pCxt
,
pMerge
->
pTargets
,
pMerge
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pMerge
;
}
else
{
...
...
@@ -1319,13 +1332,21 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
strcpy
(
pDeleter
->
tableFName
,
pModify
->
tableFName
);
pDeleter
->
deleteTimeRange
=
pModify
->
deleteTimeRange
;
pDeleter
->
sink
.
pInputDataBlockDesc
=
nodesCloneNode
(
pRoot
->
pOutputDataBlockDesc
);
if
(
NULL
==
pDeleter
->
sink
.
pInputDataBlockDesc
)
{
int32_t
code
=
setNodeSlotId
(
pCxt
,
pRoot
->
pOutputDataBlockDesc
->
dataBlockId
,
-
1
,
pModify
->
pAffectedRows
,
&
pDeleter
->
pAffectedRows
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pDeleter
->
sink
.
pInputDataBlockDesc
=
nodesCloneNode
(
pRoot
->
pOutputDataBlockDesc
);
if
(
NULL
==
pDeleter
->
sink
.
pInputDataBlockDesc
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pSink
=
(
SDataSinkNode
*
)
pDeleter
;
}
else
{
nodesDestroyNode
(
pDeleter
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pSink
=
(
SDataSinkNode
*
)
pDeleter
;
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
e033ad14
...
...
@@ -80,30 +80,36 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
splCreateExchangeNodeForSubplan
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pSplitNode
,
ESubplanType
subplanType
)
{
SExchangeLogicNode
*
pExchange
=
NULL
;
if
(
TSDB_CODE_SUCCESS
!=
splCreateExchangeNode
(
pCxt
,
pSplitNode
,
&
pExchange
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSubplan
->
subplanType
=
subplanType
;
if
(
NULL
==
pSplitNode
->
pParent
)
{
pSubplan
->
pNode
=
(
SLogicNode
*
)
pExchange
;
static
int32_t
splReplaceLogicNode
(
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pOld
,
SLogicNode
*
pNew
)
{
if
(
NULL
==
pOld
->
pParent
)
{
pSubplan
->
pNode
=
(
SLogicNode
*
)
pNew
;
return
TSDB_CODE_SUCCESS
;
}
SNode
*
pNode
;
FOREACH
(
pNode
,
p
SplitNode
->
pParent
->
pChildren
)
{
if
(
nodesEqualNode
(
pNode
,
p
SplitNode
))
{
REPLACE_NODE
(
p
Exchange
);
p
Exchange
->
node
.
pParent
=
pSplitNode
->
pParent
;
FOREACH
(
pNode
,
p
Old
->
pParent
->
pChildren
)
{
if
(
nodesEqualNode
(
pNode
,
p
Old
))
{
REPLACE_NODE
(
p
New
);
p
New
->
pParent
=
pOld
->
pParent
;
return
TSDB_CODE_SUCCESS
;
}
}
nodesDestroyNode
(
pExchange
);
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
static
int32_t
splCreateExchangeNodeForSubplan
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pSplitNode
,
ESubplanType
subplanType
)
{
SExchangeLogicNode
*
pExchange
=
NULL
;
int32_t
code
=
splCreateExchangeNode
(
pCxt
,
pSplitNode
,
&
pExchange
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
splReplaceLogicNode
(
pSubplan
,
pSplitNode
,
(
SLogicNode
*
)
pExchange
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pSubplan
->
subplanType
=
subplanType
;
}
else
{
nodesDestroyNode
(
pExchange
);
}
return
code
;
}
static
bool
splMatch
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
flag
,
FSplFindSplitNode
func
,
void
*
pInfo
)
{
...
...
@@ -295,24 +301,34 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return
code
;
}
static
int32_t
stbSplCreateMergeNode
(
SSplitContext
*
pCxt
,
SLogic
Node
*
pParent
,
SNodeList
*
pMergeKeys
,
SLogicNode
*
pPartChild
)
{
static
int32_t
stbSplCreateMergeNode
(
SSplitContext
*
pCxt
,
SLogic
Subplan
*
pSubplan
,
SLogicNode
*
pSplitNode
,
S
NodeList
*
pMergeKeys
,
S
LogicNode
*
pPartChild
)
{
SMergeLogicNode
*
pMerge
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_MERGE
);
if
(
NULL
==
pMerge
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pMerge
->
numOfChannels
=
((
SScanLogicNode
*
)
nodesListGetNode
(
pPartChild
->
pChildren
,
0
))
->
pVgroupList
->
numOfVgroups
;
pMerge
->
srcGroupId
=
pCxt
->
groupId
;
pMerge
->
node
.
pParent
=
pParent
;
pMerge
->
node
.
precision
=
pPartChild
->
precision
;
pMerge
->
pMergeKeys
=
pMergeKeys
;
pMerge
->
node
.
pTargets
=
nodesCloneList
(
pPartChild
->
pTargets
);
if
(
NULL
==
pMerge
->
node
.
pTargets
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
pMerge
->
pInputs
=
nodesCloneList
(
pPartChild
->
pTargets
);
pMerge
->
node
.
pTargets
=
nodesCloneList
(
pSplitNode
->
pTargets
);
if
(
NULL
==
pMerge
->
node
.
pTargets
||
NULL
==
pMerge
->
pInputs
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
NULL
==
pSubplan
)
{
code
=
nodesListMakeAppend
(
&
pSplitNode
->
pChildren
,
pMerge
);
}
else
{
code
=
splReplaceLogicNode
(
pSubplan
,
pSplitNode
,
(
SLogicNode
*
)
pMerge
);
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
pMerge
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pMerge
);
return
code
;
}
static
int32_t
stbSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicNode
*
pParent
,
SLogicNode
*
pPartChild
)
{
...
...
@@ -329,8 +345,15 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn
int32_t
code
=
stbSplCreatePartWindowNode
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartWindow
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
((
SWindowLogicNode
*
)
pPartWindow
)
->
intervalAlgo
=
INTERVAL_ALGO_HASH
;
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
intervalAlgo
=
INTERVAL_ALGO_SORT_MERGE
;
code
=
stbSplCreateExchangeNode
(
pCxt
,
pInfo
->
pSplitNode
,
pPartWindow
);
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
intervalAlgo
=
INTERVAL_ALGO_MERGE
;
SNodeList
*
pMergeKeys
=
NULL
;
code
=
nodesListMakeStrictAppend
(
&
pMergeKeys
,
nodesCloneNode
(((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
pTspk
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
NULL
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pPartWindow
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyList
(
pMergeKeys
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
...
...
@@ -424,37 +447,99 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return
code
;
}
static
int32_t
stbSplCreatePartSortNode
(
SSortLogicNode
*
pMergeSort
,
SLogicNode
**
pOutput
)
{
SNodeList
*
pSortKeys
=
pMergeSort
->
pSortKeys
;
pMergeSort
->
pSortKeys
=
NULL
;
SNodeList
*
pTargets
=
pMergeSort
->
node
.
pTargets
;
pMergeSort
->
node
.
pTargets
=
NULL
;
SNodeList
*
pChildren
=
pMergeSort
->
node
.
pChildren
;
pMergeSort
->
node
.
pChildren
=
NULL
;
static
SNode
*
stbSplCreateColumnNode
(
SExprNode
*
pExpr
)
{
SColumnNode
*
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
NULL
;
}
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pExpr
))
{
strcpy
(
pCol
->
tableAlias
,
((
SColumnNode
*
)
pExpr
)
->
tableAlias
);
}
strcpy
(
pCol
->
colName
,
pExpr
->
aliasName
);
strcpy
(
pCol
->
node
.
aliasName
,
pExpr
->
aliasName
);
pCol
->
node
.
resType
=
pExpr
->
resType
;
return
(
SNode
*
)
pCol
;
}
static
SNode
*
stbSplCreateOrderByExpr
(
SOrderByExprNode
*
pSortKey
,
SNode
*
pCol
)
{
SOrderByExprNode
*
pOutput
=
nodesMakeNode
(
QUERY_NODE_ORDER_BY_EXPR
);
if
(
NULL
==
pOutput
)
{
return
NULL
;
}
pOutput
->
pExpr
=
nodesCloneNode
(
pCol
);
if
(
NULL
==
pOutput
->
pExpr
)
{
nodesDestroyNode
(
pOutput
);
return
NULL
;
}
pOutput
->
order
=
pSortKey
->
order
;
pOutput
->
nullOrder
=
pSortKey
->
nullOrder
;
return
(
SNode
*
)
pOutput
;
}
static
int32_t
stbSplCreateMergeKeys
(
SNodeList
*
pSortKeys
,
SNodeList
*
pTargets
,
SNodeList
**
pOutput
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNodeList
*
pMergeKeys
=
NULL
;
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pSortKeys
)
{
SOrderByExprNode
*
pSortKey
=
(
SOrderByExprNode
*
)
pNode
;
SNode
*
pTarget
=
NULL
;
bool
found
=
false
;
FOREACH
(
pTarget
,
pTargets
)
{
if
(
0
==
strcmp
(((
SExprNode
*
)
pSortKey
->
pExpr
)
->
aliasName
,
((
SColumnNode
*
)
pTarget
)
->
colName
))
{
code
=
nodesListMakeStrictAppend
(
&
pMergeKeys
,
stbSplCreateOrderByExpr
(
pSortKey
,
pTarget
));
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
found
=
true
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
!
found
)
{
SNode
*
pCol
=
stbSplCreateColumnNode
((
SExprNode
*
)
pSortKey
->
pExpr
);
code
=
nodesListMakeStrictAppend
(
&
pMergeKeys
,
stbSplCreateOrderByExpr
(
pSortKey
,
pCol
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListStrictAppend
(
pTargets
,
pCol
);
}
else
{
nodesDestroyNode
(
pCol
);
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pOutput
=
pMergeKeys
;
}
else
{
nodesDestroyList
(
pMergeKeys
);
}
return
code
;
}
static
int32_t
stbSplCreatePartSortNode
(
SSortLogicNode
*
pSort
,
SLogicNode
**
pOutputPartSort
,
SNodeList
**
pOutputMergeKeys
)
{
SNodeList
*
pSortKeys
=
pSort
->
pSortKeys
;
pSort
->
pSortKeys
=
NULL
;
SNodeList
*
pChildren
=
pSort
->
node
.
pChildren
;
pSort
->
node
.
pChildren
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSortLogicNode
*
pPartSort
=
nodesCloneNode
(
p
Merge
Sort
);
SSortLogicNode
*
pPartSort
=
nodesCloneNode
(
pSort
);
if
(
NULL
==
pPartSort
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
pMergeSort
->
node
.
pTargets
=
pTargets
;
pPartSort
->
node
.
pChildren
=
pChildren
;
SNodeList
*
pMergeKeys
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pPartSort
->
node
.
pChildren
=
pChildren
;
pPartSort
->
pSortKeys
=
pSortKeys
;
code
=
createColumnByRewriteExps
(
pPartSort
->
pSortKeys
,
&
pPartSort
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pMergeSort
->
pSortKeys
=
nodesCloneList
(
pPartSort
->
node
.
pTargets
);
if
(
NULL
==
pMergeSort
->
pSortKeys
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
stbSplCreateMergeKeys
(
pPartSort
->
pSortKeys
,
pPartSort
->
node
.
pTargets
,
&
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pOutput
=
(
SLogicNode
*
)
pPartSort
;
*
pOutputPartSort
=
(
SLogicNode
*
)
pPartSort
;
*
pOutputMergeKeys
=
pMergeKeys
;
}
else
{
nodesDestroyNode
(
pPartSort
);
nodesDestroyList
(
pMergeKeys
);
}
return
code
;
...
...
@@ -462,17 +547,10 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode**
static
int32_t
stbSplSplitSortNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pPartSort
=
NULL
;
int32_t
code
=
stbSplCreatePartSortNode
((
SSortLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartSort
);
SNodeList
*
pMergeKeys
=
NULL
;
int32_t
code
=
stbSplCreatePartSortNode
((
SSortLogicNode
*
)
pInfo
->
pSplitNode
,
&
pPartSort
,
&
pMergeKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SNodeList
*
pMergeKeys
=
nodesCloneList
(((
SSortLogicNode
*
)
pInfo
->
pSplitNode
)
->
pSortKeys
);
if
(
NULL
!=
pMergeKeys
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pPartSort
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyList
(
pMergeKeys
);
}
}
else
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
stbSplCreateMergeNode
(
pCxt
,
pInfo
->
pSubplan
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pPartSort
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pInfo
->
pSubplan
->
pChildren
,
...
...
source/libs/planner/test/planOrderByTest.cpp
浏览文件 @
e033ad14
...
...
@@ -46,4 +46,7 @@ TEST_F(PlanOrderByTest, stable) {
// ORDER BY key is in the projection list
run
(
"SELECT c1 FROM st1 ORDER BY c1"
);
// ORDER BY key is not in the projection list
run
(
"SELECT c2 FROM st1 ORDER BY c1"
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录