Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
86622ddd
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
86622ddd
编写于
7月 02, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into test3.0/lihui
上级
b9cc3e6e
fbc206d4
变更
13
显示空白变更内容
内联
并排
Showing
13 changed file
with
206 addition
and
120 deletion
+206
-120
include/libs/qcom/query.h
include/libs/qcom/query.h
+0
-7
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+3
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+4
-3
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+1
-1
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+10
-8
tests/system-test/1-insert/insertWithMoreVgroup.py
tests/system-test/1-insert/insertWithMoreVgroup.py
+0
-34
tests/system-test/1-insert/test_stmt_insert_query_ex.py
tests/system-test/1-insert/test_stmt_insert_query_ex.py
+24
-24
tests/system-test/1-insert/test_stmt_muti_insert_query.py
tests/system-test/1-insert/test_stmt_muti_insert_query.py
+6
-6
tests/system-test/2-query/queryQnode.py
tests/system-test/2-query/queryQnode.py
+153
-8
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
...-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
+2
-26
tests/system-test/6-cluster/clusterCommonCheck.py
tests/system-test/6-cluster/clusterCommonCheck.py
+2
-1
tests/system-test/6-cluster/clusterCommonCreate.py
tests/system-test/6-cluster/clusterCommonCreate.py
+0
-1
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-1
未找到文件。
include/libs/qcom/query.h
浏览文件 @
86622ddd
...
...
@@ -249,13 +249,6 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define REQUEST_TOTAL_EXEC_TIMES 2
#define qFatal(...) \
...
...
source/client/src/clientImpl.c
浏览文件 @
86622ddd
...
...
@@ -786,6 +786,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
void
schedulerExecCb
(
SQueryResult
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
pRequest
->
code
=
code
;
pRequest
->
body
.
resInfo
.
execRes
=
pResult
->
res
;
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
||
TDMT_VND_DELETE
==
pRequest
->
type
||
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
...
...
@@ -797,6 +798,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
}
}
taosMemoryFree
(
pResult
);
tscDebug
(
"0x%"
PRIx64
" enter scheduler exec cb, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
requestId
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
86622ddd
...
...
@@ -276,8 +276,6 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
...
...
@@ -309,7 +307,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0))
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH)
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
86622ddd
...
...
@@ -835,7 +835,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
}
if
(
!
NEED_SCHEDULER_RETRY_ERROR
(
errCode
))
{
if
(
!
SCH_NEED_RETRY
(
pTask
->
lastMsgType
,
errCode
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry cause of errCode, errCode:%x - %s"
,
errCode
,
tstrerror
(
errCode
));
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
86622ddd
...
...
@@ -23,7 +23,7 @@
int32_t
schValidateReceivedMsgType
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
)
{
int32_t
lastMsgType
=
SCH_GET_TASK_LASTMSG_TYPE
(
pTask
)
;
int32_t
lastMsgType
=
pTask
->
lastMsgType
;
int32_t
taskStatus
=
SCH_GET_TASK_STATUS
(
pTask
);
int32_t
reqMsgType
=
msgType
-
1
;
switch
(
msgType
)
{
...
...
@@ -42,7 +42,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
TMSG_INFO
(
msgType
));
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
//
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return
TSDB_CODE_SUCCESS
;
case
TDMT_SCH_FETCH_RSP
:
if
(
lastMsgType
!=
reqMsgType
&&
-
1
!=
lastMsgType
)
{
...
...
@@ -57,7 +57,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
//
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_CREATE_TABLE_RSP
:
case
TDMT_VND_DROP_TABLE_RSP
:
...
...
@@ -82,7 +82,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
//
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -396,7 +396,8 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCH_ERR_JRET
(
schValidateReceivedMsgType
(
pJob
,
pTask
,
msgType
));
if
(
NEED_SCHEDULER_REDIRECT_ERROR
(
rspCode
)
||
SCH_SUB_TASK_NETWORK_ERR
(
rspCode
,
pMsg
->
len
>
0
))
{
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_NEED_REDIRECT
(
reqType
,
rspCode
,
pMsg
->
len
))
{
code
=
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
);
goto
_return
;
}
...
...
@@ -855,6 +856,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
addr
->
nodeId
,
epSet
->
eps
[
epSet
->
inUse
].
fqdn
,
epSet
->
eps
[
epSet
->
inUse
].
port
,
trans
->
pTrans
,
trans
->
pHandle
);
if
(
pTask
)
{
pTask
->
lastMsgType
=
msgType
;
}
int64_t
transporterId
=
0
;
code
=
asyncSendMsgToServerExt
(
trans
->
pTrans
,
epSet
,
&
transporterId
,
pMsgSendInfo
,
persistHandle
,
ctx
);
...
...
@@ -1098,8 +1102,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break
;
}
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
msgType
);
SSchTrans
trans
=
{.
pTrans
=
pJob
->
conn
.
pTrans
,
.
pHandle
=
SCH_GET_TASK_HANDLE
(
pTask
)};
SCH_ERR_JRET
(
schAsyncSendMsg
(
pJob
,
pTask
,
&
trans
,
addr
,
msgType
,
msg
,
msgSize
,
persistHandle
,
(
rpcCtx
.
args
?
&
rpcCtx
:
NULL
)));
...
...
@@ -1112,7 +1114,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
_return:
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
)
;
pTask
->
lastMsgType
=
-
1
;
schFreeRpcCtx
(
&
rpcCtx
);
taosMemoryFreeClear
(
msg
);
...
...
tests/system-test/1-insert/insertWithMoreVgroup.py
浏览文件 @
86622ddd
...
...
@@ -355,40 +355,6 @@ class TDTestCase:
return
def
test_case4
(
self
):
self
.
taosBenchCreate
(
"127.0.0.1"
,
"no"
,
"db1"
,
"stb1"
,
1
,
2
,
1
*
10
)
tdSql
.
execute
(
"use db1;"
)
tdSql
.
query
(
"show dnodes;"
)
dnodeId
=
tdSql
.
getData
(
0
,
0
)
print
(
dnodeId
)
tdSql
.
execute
(
"create qnode on dnode %s"
%
dnodeId
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
maxQnode
=
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select min(c1) from stb11;"
)
minQnode
=
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
unionQnode
=
tdSql
.
queryResult
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
unionallQnode
=
tdSql
.
queryResult
# tdSql.query("show qnodes;")
# qnodeId=tdSql.getData(0,0)
tdSql
.
execute
(
"drop qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
tdSql
.
checkData
(
0
,
0
,
"%s"
%
maxQnode
)
tdSql
.
query
(
"select min(c1) from stb11;"
)
tdSql
.
checkData
(
0
,
0
,
"%s"
%
minQnode
)
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
unionVnode
=
tdSql
.
queryResult
assert
unionQnode
==
unionVnode
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
unionallVnode
=
tdSql
.
queryResult
assert
unionallQnode
==
unionallVnode
# tdSql.execute("create qnode on dnode %s"%dnodeId)
# run case
def
run
(
self
):
...
...
tests/system-test/1-insert/test_stmt_insert_query_ex.py
浏览文件 @
86622ddd
...
...
@@ -82,7 +82,7 @@ class TDTestCase:
return
con
def
test_stmt_set_tbname_tag
(
self
,
conn
):
dbname
=
"stmt_
set_tbname_
tag"
dbname
=
"stmt_tag"
try
:
conn
.
execute
(
"drop database if exists %s"
%
dbname
)
...
...
@@ -196,31 +196,31 @@ class TDTestCase:
assert
rows9
[
0
][
0
]
==
12
,
'fourth case is failed'
assert
rows9
[
1
][
0
]
==
12
,
'fourth case is failed'
#
#
query: conversion Functions
#
querystmt4=conn.statement("select cast( ? as bigint) from log ")
#
queryparam4=new_bind_params(1)
#
print(type(queryparam4))
#
queryparam4[0].binary('1232a')
#
querystmt4.bind_param(queryparam4)
#
querystmt4.execute()
#
result4=querystmt4.use_result()
#
rows4=result4.fetch_all()
#
print("5",rows4)
#
assert rows4[0][0] == 1232
#
assert rows4[1][0] == 1232
#
querystmt4=conn.statement("select cast( ? as binary(10)) from log ")
#
queryparam4=new_bind_params(1)
#
print(type(queryparam4))
#
queryparam4[0].int(123)
#
querystmt4.bind_param(queryparam4)
#
querystmt4.execute()
#
result4=querystmt4.use_result()
#
rows4=result4.fetch_all()
#
print("6",rows4)
#
assert rows4[0][0] == '123'
#
assert rows4[1][0] == '123'
#query: conversion Functions
querystmt4
=
conn
.
statement
(
"select cast( ? as bigint) from log "
)
queryparam4
=
new_bind_params
(
1
)
print
(
type
(
queryparam4
))
queryparam4
[
0
].
binary
(
'1232a'
)
querystmt4
.
bind_param
(
queryparam4
)
querystmt4
.
execute
()
result4
=
querystmt4
.
use_result
()
rows4
=
result4
.
fetch_all
()
print
(
"5"
,
rows4
)
assert
rows4
[
0
][
0
]
==
1232
assert
rows4
[
1
][
0
]
==
1232
querystmt4
=
conn
.
statement
(
"select cast( ? as binary(10)) from log "
)
queryparam4
=
new_bind_params
(
1
)
print
(
type
(
queryparam4
))
queryparam4
[
0
].
int
(
123
)
querystmt4
.
bind_param
(
queryparam4
)
querystmt4
.
execute
()
result4
=
querystmt4
.
use_result
()
rows4
=
result4
.
fetch_all
()
print
(
"6"
,
rows4
)
assert
rows4
[
0
][
0
]
==
'123'
assert
rows4
[
1
][
0
]
==
'123'
# #query: datatime Functions
...
...
tests/system-test/1-insert/test_stmt_muti_insert_query.py
浏览文件 @
86622ddd
...
...
@@ -84,21 +84,21 @@ class TDTestCase:
def
test_stmt_insert_multi
(
self
,
conn
):
# type: (TaosConnection) -> None
dbname
=
"
pytest_taos_stmt_multi
"
dbname
=
"
db_stmt
"
try
:
conn
.
execute
(
"drop database if exists %s"
%
dbname
)
conn
.
execute
(
"create database if not exists %s"
%
dbname
)
conn
.
select_db
(
dbname
)
conn
.
execute
(
"create table if not exists
log
(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,
\
"create table if not exists
stb1
(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,
\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned,
\
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)"
,
)
# conn.load_table_info("log")
start
=
datetime
.
now
()
stmt
=
conn
.
statement
(
"insert into
log
values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
)
stmt
=
conn
.
statement
(
"insert into
stb1
values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
)
params
=
new_multi_binds
(
16
)
params
[
0
].
timestamp
((
1626861392589
,
1626861392590
,
1626861392591
))
...
...
@@ -125,7 +125,7 @@ class TDTestCase:
assert
stmt
.
affected_rows
==
3
#query 1
querystmt
=
conn
.
statement
(
"select ?,bu from
log
"
)
querystmt
=
conn
.
statement
(
"select ?,bu from
stb1
"
)
queryparam
=
new_bind_params
(
1
)
print
(
type
(
queryparam
))
queryparam
[
0
].
binary
(
"ts"
)
...
...
@@ -135,7 +135,7 @@ class TDTestCase:
# rows=result.fetch_all()
# print( querystmt.use_result())
# result = conn.query("select * from
log
")
# result = conn.query("select * from
stb1
")
rows
=
result
.
fetch_all
()
# rows=result.fetch_all()
print
(
rows
)
...
...
@@ -144,7 +144,7 @@ class TDTestCase:
assert
rows
[
2
][
1
]
==
None
#query 2
querystmt1
=
conn
.
statement
(
"select * from
log
where bu < ?"
)
querystmt1
=
conn
.
statement
(
"select * from
stb1
where bu < ?"
)
queryparam1
=
new_bind_params
(
1
)
print
(
type
(
queryparam1
))
queryparam1
[
0
].
int
(
4
)
...
...
tests/system-test/2-query/queryQnode.py
浏览文件 @
86622ddd
...
...
@@ -17,6 +17,8 @@ import threading as thd
import
multiprocessing
as
mp
from
numpy.lib.function_base
import
insert
import
taos
from
util.dnodes
import
TDDnode
from
util.dnodes
import
*
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
...
...
@@ -30,9 +32,9 @@ class TDTestCase:
#
# --------------- main frame -------------------
#
clientCfgDict
=
{
'query
prox
y'
:
'1'
,
'debugFlag'
:
135
}
clientCfgDict
[
"query
proxy"
]
=
'2
'
clientCfgDict
[
"debugFlag"
]
=
1
43
clientCfgDict
=
{
'query
Polic
y'
:
'1'
,
'debugFlag'
:
135
}
clientCfgDict
[
"query
Policy"
]
=
'1
'
clientCfgDict
[
"debugFlag"
]
=
1
31
updatecfgDict
=
{
'clientCfg'
:
{}}
updatecfgDict
=
{
'debugFlag'
:
143
}
...
...
@@ -62,7 +64,7 @@ class TDTestCase:
return
buildPath
# init
def
init
(
self
,
conn
,
logSql
):
def
init
(
self
,
conn
,
logSql
=
True
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
# tdSql.prepare()
...
...
@@ -292,12 +294,13 @@ class TDTestCase:
tdLog
.
debug
(
"-----create database and muti-thread create tables test------- "
)
def
test_case
4
(
self
):
def
test_case
1
(
self
):
self
.
taosBenchCreate
(
"127.0.0.1"
,
"no"
,
"db1"
,
"stb1"
,
1
,
2
,
1
*
10
)
tdSql
.
execute
(
"use db1;"
)
tdSql
.
query
(
"show dnodes;"
)
dnodeId
=
tdSql
.
getData
(
0
,
0
)
print
(
dnodeId
)
tdLog
.
debug
(
"create qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"create qnode on dnode %s"
%
dnodeId
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
maxQnode
=
tdSql
.
getData
(
0
,
0
)
...
...
@@ -310,6 +313,7 @@ class TDTestCase:
# tdSql.query("show qnodes;")
# qnodeId=tdSql.getData(0,0)
tdLog
.
debug
(
"drop qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"drop qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
...
...
@@ -323,15 +327,156 @@ class TDTestCase:
unionallVnode
=
tdSql
.
queryResult
assert
unionallQnode
==
unionallVnode
queryPolicy
=
2
simClientCfg
=
"%s/taos.cfg"
%
tdDnodes
.
getSimCfgPath
()
cmd
=
'sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'
%
simClientCfg
os
.
system
(
cmd
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
execute
(
'alter local "queryPolicy" "%d"'
%
queryPolicy
)
tdSql
.
query
(
"show local variables;"
)
for
i
in
range
(
tdSql
.
queryRows
):
if
tdSql
.
queryResult
[
i
][
0
]
==
"queryPolicy"
:
if
int
(
tdSql
.
queryResult
[
i
][
1
])
==
int
(
queryPolicy
):
tdLog
.
success
(
'alter queryPolicy to %d successfully'
%
queryPolicy
)
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
exit
(
"alter queryPolicy to %d failed"
%
queryPolicy
)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
execute
(
"use db1;"
)
tdSql
.
query
(
"show dnodes;"
)
dnodeId
=
tdSql
.
getData
(
0
,
0
)
tdLog
.
debug
(
"create qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"create qnode on dnode %s"
%
dnodeId
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
assert
maxQnode
==
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select min(c1) from stb11;"
)
assert
minQnode
==
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
assert
unionQnode
==
tdSql
.
queryResult
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
assert
unionallQnode
==
tdSql
.
queryResult
# tdSql.query("show qnodes;")
# qnodeId=tdSql.getData(0,0)
tdLog
.
debug
(
"drop qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"drop qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
assert
maxQnode
==
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select min(c1) from stb11;"
)
assert
minQnode
==
tdSql
.
getData
(
0
,
0
)
tdSql
.
error
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
tdSql
.
error
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
# tdSql.execute("create qnode on dnode %s"%dnodeId)
queryPolicy
=
3
simClientCfg
=
"%s/taos.cfg"
%
tdDnodes
.
getSimCfgPath
()
cmd
=
'sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'
%
simClientCfg
os
.
system
(
cmd
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
execute
(
'alter local "queryPolicy" "%d"'
%
queryPolicy
)
tdSql
.
query
(
"show local variables;"
)
for
i
in
range
(
tdSql
.
queryRows
):
if
tdSql
.
queryResult
[
i
][
0
]
==
"queryPolicy"
:
if
int
(
tdSql
.
queryResult
[
i
][
1
])
==
int
(
queryPolicy
):
tdLog
.
success
(
'alter queryPolicy to %d successfully'
%
queryPolicy
)
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
exit
(
"alter queryPolicy to %d failed"
%
queryPolicy
)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
execute
(
"use db1;"
)
tdSql
.
query
(
"show dnodes;"
)
dnodeId
=
tdSql
.
getData
(
0
,
0
)
tdLog
.
debug
(
"create qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"create qnode on dnode %s"
%
dnodeId
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
assert
maxQnode
==
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select min(c1) from stb11;"
)
assert
minQnode
==
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
assert
unionQnode
==
tdSql
.
queryResult
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
assert
unionallQnode
==
tdSql
.
queryResult
def
test_case2
(
self
):
self
.
taosBenchCreate
(
"127.0.0.1"
,
"no"
,
"db1"
,
"stb1"
,
10
,
2
,
1
*
10
)
tdSql
.
query
(
"show qnodes"
)
if
tdSql
.
queryRows
==
1
:
tdLog
.
debug
(
"drop qnode on dnode 1"
)
tdSql
.
execute
(
"drop qnode on dnode 1"
)
queryPolicy
=
2
simClientCfg
=
"%s/taos.cfg"
%
tdDnodes
.
getSimCfgPath
()
cmd
=
'sed -i "s/^queryPolicy.*/queryPolicy 2/g" %s'
%
simClientCfg
os
.
system
(
cmd
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
execute
(
'alter local "queryPolicy" "%d"'
%
queryPolicy
)
tdSql
.
query
(
"show local variables;"
)
for
i
in
range
(
tdSql
.
queryRows
):
if
tdSql
.
queryResult
[
i
][
0
]
==
"queryPolicy"
:
if
int
(
tdSql
.
queryResult
[
i
][
1
])
==
int
(
queryPolicy
):
tdLog
.
success
(
'alter queryPolicy to %d successfully'
%
queryPolicy
)
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
exit
(
"alter queryPolicy to %d failed"
%
queryPolicy
)
tdSql
.
execute
(
"use db1;"
)
tdSql
.
error
(
"select max(c1) from stb10;"
)
tdSql
.
error
(
"select min(c1) from stb11;"
)
tdSql
.
error
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
tdSql
.
error
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
tdSql
.
query
(
"select max(c1) from stb10_0;"
)
tdSql
.
query
(
"select min(c1) from stb11_0;"
)
def
test_case3
(
self
):
tdSql
.
execute
(
'alter local "queryPolicy" "3"'
)
tdLog
.
debug
(
"create qnode on dnode 1"
)
tdSql
.
execute
(
"create qnode on dnode 1"
)
tdSql
.
execute
(
"use db1;"
)
tdSql
.
query
(
"show dnodes;"
)
dnodeId
=
tdSql
.
getData
(
0
,
0
)
print
(
dnodeId
)
tdSql
.
query
(
"select max(c1) from stb10;"
)
maxQnode
=
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select min(c1) from stb11;"
)
minQnode
=
tdSql
.
getData
(
0
,
0
)
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
unionQnode
=
tdSql
.
queryResult
tdSql
.
query
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
unionallQnode
=
tdSql
.
queryResult
# tdSql.query("show qnodes;")
# qnodeId=tdSql.getData(0,0)
tdLog
.
debug
(
"drop qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"drop qnode on dnode %s"
%
dnodeId
)
tdSql
.
execute
(
"reset query cache"
)
tdSql
.
error
(
"select max(c1) from stb10;"
)
tdSql
.
error
(
"select min(c1) from stb11;"
)
tdSql
.
error
(
"select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;"
)
tdSql
.
error
(
"select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;"
)
# run case
def
run
(
self
):
# test qnode
self
.
test_case4
()
tdLog
.
debug
(
" LIMIT test_case3 ............ [OK]"
)
self
.
test_case1
()
self
.
test_case2
()
self
.
test_case3
()
# tdLog.debug(" LIMIT test_case3 ............ [OK]")
return
...
...
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
浏览文件 @
86622ddd
...
...
@@ -65,31 +65,6 @@ class TDTestCase:
self
.
_async_raise
(
thread
.
ident
,
SystemExit
)
def
insertData
(
self
,
countstart
,
countstop
):
# fisrt add data : db\stable\childtable\general table
for
couti
in
range
(
countstart
,
countstop
):
tdLog
.
debug
(
"drop database if exists db%d"
%
couti
)
tdSql
.
execute
(
"drop database if exists db%d"
%
couti
)
print
(
"create database if not exists db%d replica 1 duration 300"
%
couti
)
tdSql
.
execute
(
"create database if not exists db%d replica 1 duration 300"
%
couti
)
tdSql
.
execute
(
"use db%d"
%
couti
)
tdSql
.
execute
(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql
.
execute
(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for
i
in
range
(
4
):
tdSql
.
execute
(
f
'create table ct
{
i
+
1
}
using stb1 tags (
{
i
+
1
}
)'
)
def
fiveDnodeThreeMnode
(
self
,
dnodeNumbers
,
mnodeNums
,
restartNumbers
,
stopRole
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db'
,
...
...
@@ -143,7 +118,8 @@ class TDTestCase:
threads
=
[]
for
i
in
range
(
restartNumbers
):
dbNameIndex
=
'%s%d'
%
(
paraDict
[
"dbName"
],
i
)
threads
.
append
(
threading
.
Thread
(
target
=
clusterComCreate
.
create_databases
,
args
=
(
tdSql
,
dbNameIndex
,
paraDict
[
"dbNumbers"
],
paraDict
[
"dropFlag"
],
paraDict
[
"vgroups"
],
paraDict
[
'replica'
])))
newTdSql
=
tdCom
.
newTdSql
()
threads
.
append
(
threading
.
Thread
(
target
=
clusterComCreate
.
create_databases
,
args
=
(
newTdSql
,
dbNameIndex
,
paraDict
[
"dbNumbers"
],
paraDict
[
"dropFlag"
],
paraDict
[
"vgroups"
],
paraDict
[
'replica'
])))
for
tr
in
threads
:
tr
.
start
()
...
...
tests/system-test/6-cluster/clusterCommonCheck.py
浏览文件 @
86622ddd
...
...
@@ -39,6 +39,7 @@ class ClusterComCheck:
def
checkDnodes
(
self
,
dnodeNumbers
):
count
=
0
# print(tdSql)
while
count
<
5
:
tdSql
.
query
(
"show dnodes"
)
# tdLog.debug(tdSql.queryResult)
...
...
@@ -85,7 +86,7 @@ class ClusterComCheck:
tdLog
.
debug
(
"check %s_%d that status is ready "
%
(
dbNameIndex
,
j
))
else
:
continue
print
(
query_status
)
#
print(query_status)
count
+=
1
if
query_status
==
dbNumbers
:
tdLog
.
success
(
"we find cluster with %d dnode and check all databases are ready within 5s! "
%
dbNumbers
)
...
...
tests/system-test/6-cluster/clusterCommonCreate.py
浏览文件 @
86622ddd
...
...
@@ -125,7 +125,6 @@ class ClusterComCreate:
def
create_databases
(
self
,
tsql
,
dbNameIndex
,
dbNumbers
,
dropFlag
=
1
,
vgroups
=
4
,
replica
=
1
):
for
i
in
range
(
dbNumbers
):
print
(
dbNumbers
)
if
dropFlag
==
1
:
tsql
.
execute
(
"drop database if exists %s_%d"
%
(
dbNameIndex
,
i
))
tsql
.
execute
(
"create database if not exists %s_%d vgroups %d replica %d"
%
(
dbNameIndex
,
i
,
vgroups
,
replica
))
...
...
tests/system-test/fulltest.sh
浏览文件 @
86622ddd
...
...
@@ -121,7 +121,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStopLoop.py
-N
5
-M
3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
#
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
-N
5
-M
3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录