Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0bbc35f6
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0bbc35f6
编写于
6月 15, 2022
作者:
J
jiacy-jcy
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into test/jcy
上级
612226aa
08c45a5e
变更
21
显示空白变更内容
内联
并排
Showing
21 changed file
with
398 addition
and
229 deletion
+398
-229
source/common/src/systable.c
source/common/src/systable.c
+6
-6
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+8
-8
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+15
-17
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+5
-7
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+39
-1
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+177
-107
source/libs/planner/test/planGroupByTest.cpp
source/libs/planner/test/planGroupByTest.cpp
+2
-0
source/libs/planner/test/planIntervalTest.cpp
source/libs/planner/test/planIntervalTest.cpp
+2
-0
source/libs/planner/test/planJoinTest.cpp
source/libs/planner/test/planJoinTest.cpp
+6
-0
source/libs/planner/test/planOrderByTest.cpp
source/libs/planner/test/planOrderByTest.cpp
+2
-0
source/libs/planner/test/planOtherTest.cpp
source/libs/planner/test/planOtherTest.cpp
+7
-0
source/libs/planner/test/planTestUtil.h
source/libs/planner/test/planTestUtil.h
+4
-0
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+2
-2
tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
...t/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
+2
-19
tests/script/tsim/sma/tsmaCreateInsertData.sim
tests/script/tsim/sma/tsmaCreateInsertData.sim
+8
-0
tests/script/tsim/trans/create_db.sim
tests/script/tsim/trans/create_db.sim
+81
-42
tests/system-test/6-cluster/5dnode3mnodeDrop.py
tests/system-test/6-cluster/5dnode3mnodeDrop.py
+4
-2
tests/system-test/6-cluster/5dnode3mnodeStop.py
tests/system-test/6-cluster/5dnode3mnodeStop.py
+12
-9
tests/system-test/6-cluster/5dnode3mnodeStopInsert.py
tests/system-test/6-cluster/5dnode3mnodeStopInsert.py
+11
-6
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+4
-2
未找到文件。
source/common/src/systable.c
浏览文件 @
0bbc35f6
...
...
@@ -91,8 +91,8 @@ static const SSysDbTableSchema userDBSchema[] = {
{.
name
=
"precision"
,
.
bytes
=
2
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"single_stable_model"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_BOOL
},
{.
name
=
"status"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.
name
=
"reten
s
ion"
,
.
bytes
=
60
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.
name
=
"reten
t
ion"
,
.
bytes
=
60
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
// {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update
};
...
...
@@ -137,7 +137,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.
name
=
"target_table"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"watermark"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"trigger"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
};
static
const
SSysDbTableSchema
userTblsSchema
[]
=
{
{.
name
=
"table_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
...
...
@@ -221,7 +221,9 @@ static const SSysDbTableSchema transSchema[] = {
{.
name
=
"db"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"failed_times"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"last_exec_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"last_action_info"
,
.
bytes
=
(
TSDB_TRANS_ERROR_LEN
-
1
)
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"last_action_info"
,
.
bytes
=
(
TSDB_TRANS_ERROR_LEN
-
1
)
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
};
static
const
SSysDbTableSchema
configSchema
[]
=
{
...
...
@@ -314,8 +316,6 @@ static const SSysDbTableSchema querySchema[] = {
{.
name
=
"sql"
,
.
bytes
=
TSDB_SHOW_SQL_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
};
static
const
SSysTableMeta
perfsMeta
[]
=
{
{
TSDB_PERFS_TABLE_CONNECTIONS
,
connectionsSchema
,
tListLen
(
connectionsSchema
)},
{
TSDB_PERFS_TABLE_QUERIES
,
querySchema
,
tListLen
(
querySchema
)},
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
0bbc35f6
...
...
@@ -183,12 +183,12 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
pDb
->
cfg
.
pRetensions
=
taosArrayInit
(
pDb
->
cfg
.
numOfRetensions
,
sizeof
(
SRetention
));
if
(
pDb
->
cfg
.
pRetensions
==
NULL
)
goto
_OVER
;
for
(
int32_t
i
=
0
;
i
<
pDb
->
cfg
.
numOfRetensions
;
++
i
)
{
SRetention
reten
s
ion
=
{
0
};
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
reten
s
ion
.
freq
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
reten
s
ion
.
keep
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
reten
s
ion
.
freqUnit
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
reten
s
ion
.
keepUnit
,
_OVER
)
if
(
taosArrayPush
(
pDb
->
cfg
.
pRetensions
,
&
reten
s
ion
)
==
NULL
)
{
SRetention
reten
t
ion
=
{
0
};
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
reten
t
ion
.
freq
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
reten
t
ion
.
keep
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
reten
t
ion
.
freqUnit
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
reten
t
ion
.
keepUnit
,
_OVER
)
if
(
taosArrayPush
(
pDb
->
cfg
.
pRetensions
,
&
reten
t
ion
)
==
NULL
)
{
goto
_OVER
;
}
}
...
...
@@ -472,7 +472,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
}
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
OLLBACK
,
TRN_CONFLICT_DB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
ETRY
,
TRN_CONFLICT_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create db:%s"
,
pTrans
->
id
,
pCreate
->
db
);
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
0bbc35f6
...
...
@@ -44,6 +44,7 @@ static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static
int32_t
mndProcessGetTbSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveSma
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
);
static
void
mndDestroySmaObj
(
SSmaObj
*
pSmaObj
);
int32_t
mndInitSma
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
...
...
@@ -390,7 +391,9 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
taosRLockLatch
(
&
pStb
->
lock
);
memcpy
(
&
stbObj
,
pStb
,
sizeof
(
SStbObj
));
taosRUnLockLatch
(
&
pStb
->
lock
);
stbObj
.
numOfColumns
=
0
;
stbObj
.
pColumns
=
NULL
;
stbObj
.
numOfTags
=
0
;
stbObj
.
pTags
=
NULL
;
stbObj
.
updateTime
=
taosGetTimestampMs
();
stbObj
.
lock
=
0
;
...
...
@@ -501,6 +504,13 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
return
0
;
}
static
void
mndDestroySmaObj
(
SSmaObj
*
pSmaObj
)
{
if
(
pSmaObj
)
{
taosMemoryFreeClear
(
pSmaObj
->
schemaRow
.
pSchema
);
taosMemoryFreeClear
(
pSmaObj
->
schemaTag
.
pSchema
);
}
}
static
int32_t
mndCreateSma
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SMCreateSmaReq
*
pCreate
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
SSmaObj
smaObj
=
{
0
};
memcpy
(
smaObj
.
name
,
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
...
...
@@ -524,29 +534,17 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj
.
tagsFilterLen
=
pCreate
->
tagsFilterLen
;
smaObj
.
sqlLen
=
pCreate
->
sqlLen
;
smaObj
.
astLen
=
pCreate
->
astLen
;
if
(
smaObj
.
exprLen
>
0
)
{
smaObj
.
expr
=
taosMemoryMalloc
(
smaObj
.
exprLen
);
if
(
smaObj
.
expr
==
NULL
)
goto
_OVER
;
memcpy
(
smaObj
.
expr
,
pCreate
->
expr
,
smaObj
.
exprLen
);
smaObj
.
expr
=
pCreate
->
expr
;
}
if
(
smaObj
.
tagsFilterLen
>
0
)
{
smaObj
.
tagsFilter
=
taosMemoryMalloc
(
smaObj
.
tagsFilterLen
);
if
(
smaObj
.
tagsFilter
==
NULL
)
goto
_OVER
;
memcpy
(
smaObj
.
tagsFilter
,
pCreate
->
tagsFilter
,
smaObj
.
tagsFilterLen
);
smaObj
.
tagsFilter
=
pCreate
->
tagsFilter
;
}
if
(
smaObj
.
sqlLen
>
0
)
{
smaObj
.
sql
=
taosMemoryMalloc
(
smaObj
.
sqlLen
);
if
(
smaObj
.
sql
==
NULL
)
goto
_OVER
;
memcpy
(
smaObj
.
sql
,
pCreate
->
sql
,
smaObj
.
sqlLen
);
smaObj
.
sql
=
pCreate
->
sql
;
}
if
(
smaObj
.
astLen
>
0
)
{
smaObj
.
ast
=
taosMemoryMalloc
(
smaObj
.
astLen
);
if
(
smaObj
.
ast
==
NULL
)
goto
_OVER
;
memcpy
(
smaObj
.
ast
,
pCreate
->
ast
,
smaObj
.
astLen
);
smaObj
.
ast
=
pCreate
->
ast
;
}
SStreamObj
streamObj
=
{
0
};
...
...
@@ -589,6 +587,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
code
=
0
;
_OVER:
mndDestroySmaObj
(
&
smaObj
);
mndTransDrop
(
pTrans
);
return
code
;
}
...
...
@@ -1013,7 +1012,6 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
rsp
->
version
=
pStb
->
smaVer
;
mndReleaseStb
(
pMnode
,
pStb
);
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_SMA
,
pIter
,
(
void
**
)
&
pSma
);
if
(
pIter
==
NULL
)
break
;
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
0bbc35f6
...
...
@@ -1347,14 +1347,12 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
i
);
if
(
pAction
->
errCode
!=
0
)
{
mInfo
(
"trans:%d, %s:%d set processed for kill msg received, errCode from %s to success"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
i
,
tstrerror
(
pAction
->
errCode
));
pAction
->
msgSent
=
1
;
pAction
->
msgReceived
=
1
;
pAction
->
errCode
=
0
;
}
}
mndTransExecute
(
pMnode
,
pTrans
);
return
0
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
0bbc35f6
...
...
@@ -940,7 +940,7 @@ static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbOb
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_
DND_DROP_VNODE
;
action
.
msgType
=
TDMT_
SYNC_SET_VNODE_STANDBY
;
action
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
;
if
(
isRedo
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
0bbc35f6
...
...
@@ -66,7 +66,13 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
vInfo
(
"vgId:%d, replica:%d %s:%u"
,
TD_VID
(
pVnode
),
r
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
return
syncReconfig
(
pVnode
->
sync
,
&
cfg
);
SRpcMsg
rpcMsg
=
{.
info
=
pMsg
->
info
};
if
(
syncReconfigBuild
(
pVnode
->
sync
,
&
cfg
,
&
rpcMsg
)
!=
0
)
{
vError
(
"vgId:%d, failed to build reconfig msg since %s"
,
TD_VID
(
pVnode
),
terrstr
());
return
-
1
;
}
return
syncPropose
(
pVnode
->
sync
,
&
rpcMsg
,
false
);
}
void
vnodeProposeMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
...
...
@@ -241,6 +247,30 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
int32_t
vnodeSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
vnodeSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
vnodeSnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
int32_t
vnodeSnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
**
ppWriter
)
{
return
0
;
}
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
int32_t
vnodeSnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
static
SSyncFSM
*
vnodeSyncMakeFsm
(
SVnode
*
pVnode
)
{
SSyncFSM
*
pFsm
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncFSM
));
pFsm
->
data
=
pVnode
;
...
...
@@ -250,6 +280,14 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm
->
FpGetSnapshot
=
vnodeSyncGetSnapshot
;
pFsm
->
FpRestoreFinishCb
=
NULL
;
pFsm
->
FpReConfigCb
=
vnodeSyncReconfig
;
pFsm
->
FpSnapshotStartRead
=
vnodeSnapshotStartRead
;
pFsm
->
FpSnapshotStopRead
=
vnodeSnapshotStopRead
;
pFsm
->
FpSnapshotDoRead
=
vnodeSnapshotDoRead
;
pFsm
->
FpSnapshotStartWrite
=
vnodeSnapshotStartWrite
;
pFsm
->
FpSnapshotStopWrite
=
vnodeSnapshotStopWrite
;
pFsm
->
FpSnapshotDoWrite
=
vnodeSnapshotDoWrite
;
return
pFsm
;
}
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
0bbc35f6
...
...
@@ -15,6 +15,7 @@
#include "functionMgt.h"
#include "planInt.h"
#include "tglobal.h"
#define SPLIT_FLAG_MASK(n) (1 << n)
...
...
@@ -37,7 +38,8 @@ typedef struct SSplitRule {
FSplit
splitFunc
;
}
SSplitRule
;
typedef
bool
(
*
FSplFindSplitNode
)(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
void
*
pInfo
);
// typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
typedef
bool
(
*
FSplFindSplitNode
)(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
void
*
pInfo
);
static
void
splSetSubplanVgroups
(
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
...
...
@@ -95,9 +97,23 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
return
code
;
}
static
bool
splMatchByNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
FSplFindSplitNode
func
,
void
*
pInfo
)
{
if
(
func
(
pCxt
,
pSubplan
,
pNode
,
pInfo
))
{
return
true
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
if
(
splMatchByNode
(
pCxt
,
pSubplan
,
(
SLogicNode
*
)
pChild
,
func
,
pInfo
))
{
return
true
;
}
}
return
NULL
;
}
static
bool
splMatch
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
flag
,
FSplFindSplitNode
func
,
void
*
pInfo
)
{
if
(
!
SPLIT_FLAG_TEST_MASK
(
pSubplan
->
splitFlag
,
flag
))
{
if
(
func
(
pCxt
,
pSubplan
,
pInfo
))
{
if
(
splMatchByNode
(
pCxt
,
pSubplan
,
pSubplan
->
pNode
,
func
,
pInfo
))
{
return
true
;
}
}
...
...
@@ -110,6 +126,11 @@ static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag,
return
false
;
}
static
void
splSetParent
(
SLogicNode
*
pNode
)
{
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
((
SLogicNode
*
)
pChild
)
->
pParent
=
pNode
;
}
}
typedef
struct
SStableSplitInfo
{
SLogicNode
*
pSplitNode
;
SLogicSubplan
*
pSubplan
;
...
...
@@ -136,11 +157,21 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
return
false
;
}
SNode
*
pChild
=
nodesListGetNode
(
pNode
->
pChildren
,
0
);
if
(
QUERY_NODE_LOGIC_PLAN_PARTITION
==
nodeType
(
pChild
))
{
if
(
1
!=
LIST_LENGTH
(((
SLogicNode
*
)
pChild
)
->
pChildren
))
{
return
false
;
}
pChild
=
nodesListGetNode
(((
SLogicNode
*
)
pChild
)
->
pChildren
,
0
);
}
return
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pChild
)
&&
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pChild
));
}
static
bool
stbSplNeedSplit
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pNode
);
// case QUERY_NODE_LOGIC_PLAN_JOIN:
// return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case
QUERY_NODE_LOGIC_PLAN_AGG
:
return
!
stbSplHasGatherExecFunc
(((
SAggLogicNode
*
)
pNode
)
->
pAggFuncs
)
&&
stbSplHasMultiTbScan
(
streamQuery
,
pNode
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
{
...
...
@@ -152,35 +183,20 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
}
case
QUERY_NODE_LOGIC_PLAN_SORT
:
return
stbSplHasMultiTbScan
(
streamQuery
,
pNode
);
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
stbSplIsMultiTbScan
(
streamQuery
,
(
SScanLogicNode
*
)
pNode
);
default:
break
;
}
return
false
;
}
static
SLogicNode
*
stbSplMatchByNode
(
bool
streamQuery
,
SLogicNode
*
pNode
)
{
if
(
stbSplNeedSplit
(
streamQuery
,
pNode
))
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
streamQuery
,
(
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
stbSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SStableSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
stbSplMatchByNode
(
pCxt
->
pPlanCxt
->
streamQuery
,
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pSplitNode
=
pSplitNode
;
static
bool
stbSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SStableSplitInfo
*
pInfo
)
{
if
(
stbSplNeedSplit
(
pCxt
->
pPlanCxt
->
streamQuery
,
pNode
))
{
pInfo
->
pSplitNode
=
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
stbSplRewriteFuns
(
const
SNodeList
*
pFuncs
,
SNodeList
**
pPartialFuncs
,
SNodeList
**
pMergeFuncs
)
{
...
...
@@ -258,6 +274,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pMergeWindow
->
node
.
pTargets
=
pTargets
;
pPartWin
->
node
.
pChildren
=
pChildren
;
splSetParent
((
SLogicNode
*
)
pPartWin
);
code
=
stbSplRewriteFuns
(
pFunc
,
&
pPartWin
->
pFuncs
,
&
pMergeWindow
->
pFuncs
);
}
int32_t
index
=
0
;
...
...
@@ -285,13 +302,24 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return
code
;
}
static
int32_t
stbSplGetNumOfVgroups
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
))
{
return
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
->
numOfVgroups
;
}
else
{
if
(
1
==
LIST_LENGTH
(
pNode
->
pChildren
))
{
return
stbSplGetNumOfVgroups
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
));
}
}
return
0
;
}
static
int32_t
stbSplCreateMergeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pSplitNode
,
SNodeList
*
pMergeKeys
,
SLogicNode
*
pPartChild
)
{
SMergeLogicNode
*
pMerge
=
(
SMergeLogicNode
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_MERGE
);
if
(
NULL
==
pMerge
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pMerge
->
numOfChannels
=
((
SScanLogicNode
*
)
nodesListGetNode
(
pPartChild
->
pChildren
,
0
))
->
pVgroupList
->
numOfVgroups
;
pMerge
->
numOfChannels
=
stbSplGetNumOfVgroups
(
pPartChild
)
;
pMerge
->
srcGroupId
=
pCxt
->
groupId
;
pMerge
->
node
.
precision
=
pPartChild
->
precision
;
pMerge
->
pMergeKeys
=
pMergeKeys
;
...
...
@@ -329,12 +357,12 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return
code
;
}
static
int32_t
stbSplCreateMergeKeys
ForInterval
(
SNode
*
pWStartTs
,
SNodeList
**
pMergeKeys
)
{
static
int32_t
stbSplCreateMergeKeys
ByPrimaryKey
(
SNode
*
pPrimaryKey
,
SNodeList
**
pMergeKeys
)
{
SOrderByExprNode
*
pMergeKey
=
(
SOrderByExprNode
*
)
nodesMakeNode
(
QUERY_NODE_ORDER_BY_EXPR
);
if
(
NULL
==
pMergeKey
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pMergeKey
->
pExpr
=
nodesCloneNode
(
p
WStartTs
);
pMergeKey
->
pExpr
=
nodesCloneNode
(
p
PrimaryKey
);
if
(
NULL
==
pMergeKey
->
pExpr
)
{
nodesDestroyNode
((
SNode
*
)
pMergeKey
);
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -351,7 +379,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
((
SWindowLogicNode
*
)
pPartWindow
)
->
intervalAlgo
=
INTERVAL_ALGO_HASH
;
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
intervalAlgo
=
INTERVAL_ALGO_MERGE
;
SNodeList
*
pMergeKeys
=
NULL
;
code
=
stbSplCreateMergeKeys
ForInterval
(((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
pTspk
,
&
pMergeKeys
);
code
=
stbSplCreateMergeKeys
ByPrimaryKey
(((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
pTspk
,
&
pMergeKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
NULL
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pPartWindow
);
}
...
...
@@ -439,6 +467,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
pMergeAgg
->
node
.
pConditions
=
pConditions
;
pMergeAgg
->
node
.
pTargets
=
pTargets
;
pPartAgg
->
node
.
pChildren
=
pChildren
;
splSetParent
((
SLogicNode
*
)
pPartAgg
);
code
=
stbSplRewriteFuns
(
pFunc
,
&
pPartAgg
->
pAggFuncs
,
&
pMergeAgg
->
pAggFuncs
);
}
...
...
@@ -553,6 +582,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
SNodeList
*
pMergeKeys
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pPartSort
->
node
.
pChildren
=
pChildren
;
splSetParent
((
SLogicNode
*
)
pPartSort
);
pPartSort
->
pSortKeys
=
pSortKeys
;
code
=
stbSplCreateMergeKeys
(
pPartSort
->
pSortKeys
,
pPartSort
->
node
.
pTargets
,
&
pMergeKeys
);
}
...
...
@@ -592,6 +622,56 @@ static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return
code
;
}
static
SNode
*
stbSplFindPrimaryKeyFromScan
(
SScanLogicNode
*
pScan
)
{
SNode
*
pCol
=
NULL
;
FOREACH
(
pCol
,
pScan
->
pScanCols
)
{
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
((
SColumnNode
*
)
pCol
)
->
colId
)
{
return
pCol
;
}
}
return
NULL
;
}
static
int32_t
stbSplSplitScanNodeForJoin
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SScanLogicNode
*
pScan
)
{
SNodeList
*
pMergeKeys
=
NULL
;
int32_t
code
=
stbSplCreateMergeKeysByPrimaryKey
(
stbSplFindPrimaryKeyFromScan
(
pScan
),
&
pMergeKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
pSubplan
,
(
SLogicNode
*
)
pScan
,
pMergeKeys
,
(
SLogicNode
*
)
pScan
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pSubplan
->
pChildren
,
(
SNode
*
)
splCreateScanSubplan
(
pCxt
,
(
SLogicNode
*
)
pScan
,
SPLIT_FLAG_STABLE_SPLIT
));
}
return
code
;
}
static
int32_t
stbSplSplitJoinNodeImpl
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SJoinLogicNode
*
pJoin
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
pJoin
->
node
.
pChildren
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pChild
))
{
code
=
stbSplSplitScanNodeForJoin
(
pCxt
,
pSubplan
,
(
SScanLogicNode
*
)
pChild
);
}
else
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pChild
))
{
code
=
stbSplSplitJoinNodeImpl
(
pCxt
,
pSubplan
,
(
SJoinLogicNode
*
)
pChild
);
}
else
{
code
=
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
return
code
;
}
static
int32_t
stbSplSplitJoinNode
(
SSplitContext
*
pCxt
,
SStableSplitInfo
*
pInfo
)
{
int32_t
code
=
stbSplSplitJoinNodeImpl
(
pCxt
,
pInfo
->
pSubplan
,
(
SJoinLogicNode
*
)
pInfo
->
pSplitNode
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pInfo
->
pSubplan
->
subplanType
=
SUBPLAN_TYPE_MERGE
;
SPLIT_FLAG_SET_MASK
(
pInfo
->
pSubplan
->
splitFlag
,
SPLIT_FLAG_STABLE_SPLIT
);
}
return
code
;
}
static
int32_t
stableSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
if
(
pCxt
->
pPlanCxt
->
rSmaQuery
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -604,6 +684,12 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
nodeType
(
info
.
pSplitNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
code
=
stbSplSplitScanNode
(
pCxt
,
&
info
);
break
;
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
code
=
stbSplSplitJoinNode
(
pCxt
,
&
info
);
break
;
case
QUERY_NODE_LOGIC_PLAN_AGG
:
code
=
stbSplSplitAggNode
(
pCxt
,
&
info
);
break
;
...
...
@@ -613,9 +699,6 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
case
QUERY_NODE_LOGIC_PLAN_SORT
:
code
=
stbSplSplitSortNode
(
pCxt
,
&
info
);
break
;
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
code
=
stbSplSplitScanNode
(
pCxt
,
&
info
);
break
;
default:
break
;
}
...
...
@@ -631,7 +714,12 @@ typedef struct SSigTbJoinSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SSigTbJoinSplitInfo
;
static
bool
sigTbJoinSplNeedSplit
(
SJoinLogicNode
*
pJoin
)
{
static
bool
sigTbJoinSplNeedSplit
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
!=
nodeType
(
pNode
))
{
return
false
;
}
SJoinLogicNode
*
pJoin
=
(
SJoinLogicNode
*
)
pNode
;
if
(
!
pJoin
->
isSingleTableJoin
)
{
return
false
;
}
...
...
@@ -639,28 +727,15 @@ static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
QUERY_NODE_LOGIC_PLAN_EXCHANGE
!=
nodeType
(
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
));
}
static
SJoinLogicNode
*
sigTbJoinSplMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pNode
)
&&
sigTbJoinSplNeedSplit
((
SJoinLogicNode
*
)
pNode
))
{
return
(
SJoinLogicNode
*
)
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SJoinLogicNode
*
pSplitNode
=
sigTbJoinSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
sigTbJoinSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SSigTbJoinSplitInfo
*
pInfo
)
{
SJoinLogicNode
*
pJoin
=
sigTbJoinSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pJoin
)
{
pInfo
->
pJoin
=
pJoin
;
pInfo
->
pSplitNode
=
(
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
);
static
bool
sigTbJoinSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SSigTbJoinSplitInfo
*
pInfo
)
{
if
(
sigTbJoinSplNeedSplit
(
pNode
))
{
pInfo
->
pJoin
=
(
SJoinLogicNode
*
)
pNode
;
pInfo
->
pSplitNode
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
);
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pJoin
;
return
false
;
}
static
int32_t
singleTableJoinSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
...
...
@@ -753,27 +828,14 @@ typedef struct SUnionAllSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SUnionAllSplitInfo
;
static
SLogicNode
*
unAllSplMatchByNode
(
SLogicNode
*
pNode
)
{
static
bool
unAllSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SUnionAllSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_PROJECT
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
unAllSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
unAllSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SUnionAllSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
unAllSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pProject
=
(
SProjectLogicNode
*
)
pSplitNode
;
pInfo
->
pProject
=
(
SProjectLogicNode
*
)
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
unAllSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SProjectLogicNode
*
pProject
)
{
...
...
@@ -828,20 +890,6 @@ typedef struct SUnionDistinctSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SUnionDistinctSplitInfo
;
static
SLogicNode
*
unDistSplMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_AGG
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
unDistSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
int32_t
unDistSplCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SAggLogicNode
*
pAgg
)
{
SExchangeLogicNode
*
pExchange
=
(
SExchangeLogicNode
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
...
...
@@ -859,13 +907,14 @@ static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* p
return
nodesListMakeAppend
(
&
pAgg
->
node
.
pChildren
,
(
SNode
*
)
pExchange
);
}
static
bool
unDistSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
S
UnionDistinctSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
unDistSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pAgg
=
(
SAggLogicNode
*
)
p
Split
Node
;
static
bool
unDistSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
S
LogicNode
*
pNode
,
SUnionDistinctSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_AGG
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
pInfo
->
pAgg
=
(
SAggLogicNode
*
)
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
unionDistinctSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
...
...
@@ -888,27 +937,14 @@ typedef struct SSmaIndexSplitInfo {
SLogicSubplan
*
pSubplan
;
}
SSmaIndexSplitInfo
;
static
SLogicNode
*
smaIdxSplMatchByNode
(
SLogicNode
*
pNode
)
{
static
bool
smaIdxSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SSmaIndexSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_MERGE
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
smaIdxSplMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
smaIdxSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SSmaIndexSplitInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
smaIdxSplMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pMerge
=
(
SMergeLogicNode
*
)
pSplitNode
;
pInfo
->
pMerge
=
(
SMergeLogicNode
*
)
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
NULL
!=
pSplitNod
e
;
return
fals
e
;
}
static
int32_t
smaIndexSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
...
...
@@ -926,13 +962,47 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return
code
;
}
typedef
struct
SQnodeSplitInfo
{
SLogicNode
*
pSplitNode
;
SLogicSubplan
*
pSubplan
;
}
SQnodeSplitInfo
;
static
bool
qndSplFindSplitNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SLogicNode
*
pNode
,
SQnodeSplitInfo
*
pInfo
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
)
&&
NULL
!=
pNode
->
pParent
)
{
pInfo
->
pSplitNode
=
pNode
;
pInfo
->
pSubplan
=
pSubplan
;
return
true
;
}
return
false
;
}
static
int32_t
qnodeSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
if
(
QUERY_POLICY_QNODE
!=
tsQueryPolicy
)
{
return
TSDB_CODE_SUCCESS
;
}
SQnodeSplitInfo
info
=
{
0
};
if
(
!
splMatch
(
pCxt
,
pSubplan
,
0
,
(
FSplFindSplitNode
)
qndSplFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
splCreateExchangeNodeForSubplan
(
pCxt
,
info
.
pSubplan
,
info
.
pSplitNode
,
info
.
pSubplan
->
subplanType
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
(
SNode
*
)
splCreateScanSubplan
(
pCxt
,
info
.
pSplitNode
,
0
));
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
return
code
;
}
// clang-format off
static
const
SSplitRule
splitRuleSet
[]
=
{
{.
pName
=
"SuperTableSplit"
,
.
splitFunc
=
stableSplit
},
{.
pName
=
"SingleTableJoinSplit"
,
.
splitFunc
=
singleTableJoinSplit
},
{.
pName
=
"UnionAllSplit"
,
.
splitFunc
=
unionAllSplit
},
{.
pName
=
"UnionDistinctSplit"
,
.
splitFunc
=
unionDistinctSplit
},
{.
pName
=
"SmaIndexSplit"
,
.
splitFunc
=
smaIndexSplit
}
{.
pName
=
"SmaIndexSplit"
,
.
splitFunc
=
smaIndexSplit
},
{.
pName
=
"QnodeSplit"
,
.
splitFunc
=
qnodeSplit
}
};
// clang-format on
...
...
source/libs/planner/test/planGroupByTest.cpp
浏览文件 @
0bbc35f6
...
...
@@ -83,5 +83,7 @@ TEST_F(PlanGroupByTest, stable) {
run
(
"SELECT COUNT(*) FROM st1 GROUP BY c1"
);
run
(
"SELECT COUNT(*) FROM st1 PARTITION BY c2 GROUP BY c1"
);
run
(
"SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL"
);
}
source/libs/planner/test/planIntervalTest.cpp
浏览文件 @
0bbc35f6
...
...
@@ -60,4 +60,6 @@ TEST_F(PlanIntervalTest, stable) {
run
(
"SELECT COUNT(*) FROM st1 INTERVAL(10s)"
);
run
(
"SELECT _WSTARTTS, COUNT(*) FROM st1 INTERVAL(10s)"
);
run
(
"SELECT _WSTARTTS, COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)"
);
}
source/libs/planner/test/planJoinTest.cpp
浏览文件 @
0bbc35f6
...
...
@@ -50,3 +50,9 @@ TEST_F(PlanJoinTest, multiJoin) {
run
(
"SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts JOIN st1s3 t3 ON t1.ts = t3.ts"
);
}
TEST_F
(
PlanJoinTest
,
stable
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT t1.c1, t2.c1 FROM st1 t1 JOIN st2 t2 ON t1.ts = t2.ts "
);
}
source/libs/planner/test/planOrderByTest.cpp
浏览文件 @
0bbc35f6
...
...
@@ -49,4 +49,6 @@ TEST_F(PlanOrderByTest, stable) {
// ORDER BY key is not in the projection list
run
(
"SELECT c2 FROM st1 ORDER BY c1"
);
run
(
"SELECT c2 FROM st1 PARTITION BY c2 ORDER BY c1"
);
}
source/libs/planner/test/planOtherTest.cpp
浏览文件 @
0bbc35f6
...
...
@@ -83,3 +83,10 @@ TEST_F(PlanOtherTest, delete) {
run
(
"DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10"
);
}
TEST_F
(
PlanOtherTest
,
queryPolicy
)
{
useDb
(
"root"
,
"test"
);
tsQueryPolicy
=
QUERY_POLICY_QNODE
;
run
(
"SELECT COUNT(*) FROM st1"
);
}
source/libs/planner/test/planTestUtil.h
浏览文件 @
0bbc35f6
...
...
@@ -18,6 +18,10 @@
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "planInt.h"
class
PlannerTestBaseImpl
;
struct
TAOS_MULTI_BIND
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
0bbc35f6
...
...
@@ -58,7 +58,7 @@
# ---- mnode
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
#
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim
...
...
@@ -132,7 +132,7 @@
#./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
#
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
# --- valgrind
...
...
tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
浏览文件 @
0bbc35f6
...
...
@@ -163,26 +163,22 @@ endi
print =============== step32: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
sql show d1.tables
if $rows != 1 then
return -1
endi
return
print =============== step33: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
sql show d1.tables
if $rows != 1 then
return -1
...
...
@@ -191,12 +187,9 @@ endi
print =============== step34: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
sql show d1.tables
if $rows != 1 then
return -1
...
...
@@ -205,15 +198,8 @@ endi
print =============== step35: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
sql show d1.tables
if $rows != 1 then
return -1
endi
=======
sql show d1.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
sql show d1.tables
if $rows != 1 then
...
...
@@ -242,8 +228,6 @@ if $rows != 1 then
return -1
endi
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
=======
print =============== step38: move follower2
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
...
...
@@ -254,7 +238,6 @@ sql show d1.tables
if $rows != 1 then
return -1
endi
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
print =============== step39: move follower1
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
...
...
tests/script/tsim/sma/tsmaCreateInsertData.sim
浏览文件 @
0bbc35f6
...
...
@@ -37,6 +37,14 @@ print =============== trigger stream to execute sma aggr task and insert sma dat
sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
#===================================================================
print =============== show streams ================================
sql show streams;
print $data00 $data01 $data02
if $data00 != d1 then
return -1
endi
print =============== select * from ct1 from memory
sql select * from ct1;
print $data00 $data01
...
...
tests/script/tsim/trans/create_db.sim
浏览文件 @
0bbc35f6
...
...
@@ -7,44 +7,28 @@ system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
sql show mnodes;
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data02 != leader then
return -1
endi
print =============== create dnodes
sql create dnode $hostname port 7200
sleep 2000
sql show dnodes;
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
if $data(1)[4] != ready then
goto step1
endi
if $data10 != 2 then
return -1
if $data(2)[4] != ready then
goto step1
endi
print =============== kill dnode2
...
...
@@ -68,7 +52,7 @@ if $data[0][0] != 7 then
return -1
endi
if $data[0][2] !=
un
doAction then
if $data[0][2] !=
re
doAction then
return -1
endi
...
...
@@ -80,14 +64,34 @@ sql_error create database d1 vgroups 2;
print =============== start dnode2
system sh/exec.sh -n dnode2 -s start
sleep 3000
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step2
endi
if $data(2)[4] != ready then
goto step2
endi
sql show transactions
if $rows != 0 then
return -1
endi
sql create database d1 vgroups 2;
sql
_error
create database d1 vgroups 2;
print =============== kill dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGINT
...
...
@@ -106,22 +110,31 @@ if $rows != 1 then
return -1
endi
if $data[0][0] !=
9
then
if $data[0][0] !=
8
then
return -1
endi
if $data[0][2] !=
un
doAction then
if $data[0][2] !=
re
doAction then
return -1
endi
if $data[0][3] != d2 then
return -1
endi
return
sql show databases ;
if $rows != 4 then
return -1
endi
print d2 ==> $data(d2)[19]
if $data(d2)[19] != creating then
return -1
endi
sql_error create database d2 vgroups 2;
print =============== kill transaction
sql kill transaction
9
;
sql kill transaction
8
;
sleep 2000
sql show transactions
...
...
@@ -131,7 +144,34 @@ endi
print =============== start dnode2
system sh/exec.sh -n dnode2 -s start
sleep 3000
$x = 0
step3:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step3
endi
if $data(2)[4] != ready then
goto step3
endi
sql show transactions
if $rows != 0 then
return -1
endi
sql drop database d2;
sql show transactions
if $rows != 0 then
...
...
@@ -145,6 +185,5 @@ sql_error kill transaction 3;
sql_error kill transaction 4;
sql_error kill transaction 5;
return
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
tests/system-test/6-cluster/5dnode3mnodeDrop.py
浏览文件 @
0bbc35f6
...
...
@@ -230,7 +230,7 @@ class TDTestCase:
def
five_dnode_three_mnode
(
self
,
dnodenumber
):
def
five_dnode_three_mnode
(
self
):
tdSql
.
query
(
"show dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
tdSql
.
checkData
(
4
,
1
,
'%s:6430'
%
self
.
host
)
...
...
@@ -260,7 +260,9 @@ class TDTestCase:
dropcount
=
0
while
dropcount
<=
10
:
for
i
in
range
(
1
,
3
):
tdLog
.
debug
(
"drop mnode on dnode %d"
%
(
i
+
1
))
tdSql
.
execute
(
"drop mnode on dnode %d"
%
(
i
+
1
))
tdLog
.
debug
(
"create mnode on dnode %d"
%
(
i
+
1
))
tdSql
.
execute
(
"create mnode on dnode %d"
%
(
i
+
1
))
dropcount
+=
1
self
.
check3mnode
()
...
...
@@ -276,7 +278,7 @@ class TDTestCase:
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
buildcluster
(
5
)
self
.
five_dnode_three_mnode
(
5
)
self
.
five_dnode_three_mnode
()
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeStop.py
浏览文件 @
0bbc35f6
...
...
@@ -145,6 +145,7 @@ class TDTestCase:
tdSql
.
checkData
(
2
,
3
,
'ready'
)
def
check3mnode1off
(
self
):
tdSql
.
error
(
"drop mnode on dnode 1;"
)
count
=
0
while
count
<
10
:
time
.
sleep
(
1
)
...
...
@@ -174,6 +175,7 @@ class TDTestCase:
tdSql
.
checkData
(
2
,
3
,
'ready'
)
def
check3mnode2off
(
self
):
tdSql
.
error
(
"drop mnode on dnode 2;"
)
count
=
0
while
count
<
10
:
time
.
sleep
(
1
)
...
...
@@ -201,6 +203,7 @@ class TDTestCase:
tdSql
.
checkData
(
2
,
3
,
'ready'
)
def
check3mnode3off
(
self
):
tdSql
.
error
(
"drop mnode on dnode 3;"
)
count
=
0
while
count
<
10
:
time
.
sleep
(
1
)
...
...
@@ -255,17 +258,17 @@ class TDTestCase:
print
(
tdSql
.
queryResult
)
tdLog
.
debug
(
"stop and follower of mnode"
)
#
self.TDDnodes.stoptaosd(2)
#
self.check3mnode2off()
#
self.TDDnodes.starttaosd(2)
self
.
TDDnodes
.
stoptaosd
(
2
)
self
.
check3mnode2off
()
self
.
TDDnodes
.
starttaosd
(
2
)
#
self.TDDnodes.stoptaosd(3)
#
self.check3mnode3off()
#
self.TDDnodes.starttaosd(2)
self
.
TDDnodes
.
stoptaosd
(
3
)
self
.
check3mnode3off
()
self
.
TDDnodes
.
starttaosd
(
2
)
#
self.TDDnodes.stoptaosd(1)
#
self.check3mnode1off()
#
self.TDDnodes.starttaosd(1)
self
.
TDDnodes
.
stoptaosd
(
1
)
self
.
check3mnode1off
()
self
.
TDDnodes
.
starttaosd
(
1
)
# self.check3mnode()
stopcount
=
0
...
...
tests/system-test/6-cluster/5dnode3mnodeStopInsert.py
浏览文件 @
0bbc35f6
...
...
@@ -12,7 +12,8 @@ from util.dnodes import TDDnode
import
time
import
socket
import
subprocess
from
multiprocessing
import
Process
import
threading
as
thd
class
MyDnodes
(
TDDnodes
):
def
__init__
(
self
,
dnodes_lists
):
super
(
MyDnodes
,
self
).
__init__
()
...
...
@@ -50,9 +51,9 @@ class TDTestCase:
break
return
buildPath
def
insert_data
(
self
,
count
):
def
insert_data
(
self
,
count
start
,
countstop
):
# fisrt add data : db\stable\childtable\general table
for
couti
in
count
:
for
couti
in
range
(
countstart
,
countstop
)
:
tdSql
.
execute
(
"drop database if exists db%d"
%
couti
)
tdSql
.
execute
(
"create database if not exists db%d replica 1 days 300"
%
couti
)
tdSql
.
execute
(
"use db%d"
%
couti
)
...
...
@@ -258,6 +259,11 @@ class TDTestCase:
stopcount
=
0
while
stopcount
<=
2
:
for
i
in
range
(
dnodenumber
):
threads
=
[]
threads
.
append
(
thd
.
Thread
(
target
=
self
.
insert_data
,
args
=
(
i
*
2
,
i
*
2
+
2
)))
# start_time = time.time()
threads
[
0
].
start
()
# end_time = time.time()
self
.
TDDnodes
.
stoptaosd
(
i
+
1
)
# if i == 1 :
# self.check3mnode2off()
...
...
@@ -265,13 +271,12 @@ class TDTestCase:
# self.check3mnode3off()
# elif i == 0:
# self.check3mnode1off()
self
.
TDDnodes
.
starttaosd
(
i
+
1
)
threads
[
0
].
join
()
# self.check3mnode()
stopcount
+=
1
self
.
check3mnode
()
def
getConnection
(
self
,
dnode
):
host
=
dnode
.
cfgDict
[
"fqdn"
]
port
=
dnode
.
cfgDict
[
"serverPort"
]
...
...
tests/system-test/fulltest.sh
浏览文件 @
0bbc35f6
...
...
@@ -18,7 +18,7 @@ python3 ./test.py -f 0-others/fsync.py
python3 ./test.py
-f
1-insert/influxdb_line_taosc_insert.py
python3 ./test.py
-f
1-insert/opentsdb_telnet_line_taosc_insert.py
python3 ./test.py
-f
1-insert/opentsdb_json_taosc_insert.py
# python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
#
BUG
python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
python3 ./test.py
-f
1-insert/alter_stable.py
python3 ./test.py
-f
1-insert/alter_table.py
python3 ./test.py
-f
1-insert/insertWithMoreVgroup.py
...
...
@@ -101,7 +101,9 @@ python3 ./test.py -f 2-query/tail.py
python3 ./test.py
-f
6-cluster/5dnode1mnode.py
python3 ./test.py
-f
6-cluster/5dnode2mnode.py
python3 ./test.py
-f
6-cluster/5dnode3mnodeStop.py
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
python3 ./test.py
-f
6-cluster/5dnode3mnodeDrop.py
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
python3 ./test.py
-f
7-tmq/basic5.py
python3 ./test.py
-f
7-tmq/subscribeDb.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录