Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1975e945
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1975e945
编写于
5月 13, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'main' into enh/rocksdbSstate
上级
23f5f7f6
0c744e86
变更
32
隐藏空白更改
内联
并排
Showing
32 changed file
with
1311 addition
and
141 deletion
+1311
-141
README-CN.md
README-CN.md
+1
-1
packaging/docker/bin/entrypoint.sh
packaging/docker/bin/entrypoint.sh
+1
-1
packaging/docker/bin/taos-check
packaging/docker/bin/taos-check
+1
-1
source/client/src/clientSml.c
source/client/src/clientSml.c
+1
-1
source/common/src/tglobal.c
source/common/src/tglobal.c
+1
-1
source/dnode/mgmt/exe/dmMain.c
source/dnode/mgmt/exe/dmMain.c
+0
-12
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+7
-4
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+2
-0
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+1
-0
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+52
-43
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+2
-2
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+10
-1
source/libs/parser/src/parCalcConst.c
source/libs/parser/src/parCalcConst.c
+3
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+12
-1
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+8
-0
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+37
-9
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+7
-8
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+25
-34
test1
test1
+0
-0
tests/pytest/util/cluster.py
tests/pytest/util/cluster.py
+4
-2
tests/script/tsim/alter/table.sim
tests/script/tsim/alter/table.sim
+18
-1
tests/script/tsim/parser/alter_column.sim
tests/script/tsim/parser/alter_column.sim
+1
-1
tests/script/tsim/query/unionall_as_table.sim
tests/script/tsim/query/unionall_as_table.sim
+17
-0
tests/system-test/0-others/user_privilege.py
tests/system-test/0-others/user_privilege.py
+36
-12
tests/system-test/6-cluster/clusterCommonCheck.py
tests/system-test/6-cluster/clusterCommonCheck.py
+40
-2
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py
...est/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py
+206
-0
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py
...est/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py
+206
-0
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py
...manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py
+222
-0
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
...anually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
+196
-0
tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py
...luster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py
+191
-0
tests/system-test/7-tmq/subscribeDb3.py
tests/system-test/7-tmq/subscribeDb3.py
+1
-1
tests/system-test/7-tmq/subscribeStb.py
tests/system-test/7-tmq/subscribeStb.py
+2
-3
未找到文件。
README-CN.md
浏览文件 @
1975e945
...
@@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java
...
@@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java
# 加入技术交流群
# 加入技术交流群
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小 T 为好友,即可入群。
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine
1
",加小 T 为好友,即可入群。
packaging/docker/bin/entrypoint.sh
浏览文件 @
1975e945
...
@@ -55,7 +55,7 @@ else
...
@@ -55,7 +55,7 @@ else
exit
$?
exit
$?
fi
fi
while
true
;
do
while
true
;
do
es
=
$(
taos
-h
$FIRST_EP_HOST
-P
$FIRST_EP_PORT
--check
)
es
=
$(
taos
-h
$FIRST_EP_HOST
-P
$FIRST_EP_PORT
--check
|
grep
"^[0-9]*:"
)
echo
${
es
}
echo
${
es
}
if
[
"
${
es
%%
:
*
}
"
-eq
2
]
;
then
if
[
"
${
es
%%
:
*
}
"
-eq
2
]
;
then
echo
"execute create dnode"
echo
"execute create dnode"
...
...
packaging/docker/bin/taos-check
浏览文件 @
1975e945
#!/bin/sh
#!/bin/sh
es
=
$(
taos
--check
)
es
=
$(
taos
--check
|
grep
"^[0-9]*:"
)
code
=
${
es
%%
:
*
}
code
=
${
es
%%
:
*
}
if
[
"
$code
"
-ne
"0"
]
&&
[
"
$code
"
-ne
"4"
]
;
then
if
[
"
$code
"
-ne
"0"
]
&&
[
"
$code
"
-ne
"4"
]
;
then
exit
0
exit
0
...
...
source/client/src/clientSml.c
浏览文件 @
1975e945
...
@@ -1351,7 +1351,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
...
@@ -1351,7 +1351,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
}
}
taosArrayPush
(
info
->
pRequest
->
tableList
,
&
pName
);
taosArrayPush
(
info
->
pRequest
->
tableList
,
&
pName
);
tstrncpy
(
pName
.
tname
,
tableData
->
childTableName
,
strlen
(
tableData
->
childTableName
)
+
1
);
strcpy
(
pName
.
tname
,
tableData
->
childTableName
);
SRequestConnInfo
conn
=
{
0
};
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
info
->
taos
->
pAppInfo
->
pTransporter
;
conn
.
pTrans
=
info
->
taos
->
pAppInfo
->
pTransporter
;
...
...
source/common/src/tglobal.c
浏览文件 @
1975e945
...
@@ -84,7 +84,7 @@ bool tsMonitorComp = false;
...
@@ -84,7 +84,7 @@ bool tsMonitorComp = false;
// telem
// telem
bool
tsEnableTelem
=
true
;
bool
tsEnableTelem
=
true
;
int32_t
tsTelemInterval
=
43200
;
int32_t
tsTelemInterval
=
43200
;
char
tsTelemServer
[
TSDB_FQDN_LEN
]
=
"telemetry.t
aosdata
.com"
;
char
tsTelemServer
[
TSDB_FQDN_LEN
]
=
"telemetry.t
dengine
.com"
;
uint16_t
tsTelemPort
=
80
;
uint16_t
tsTelemPort
=
80
;
char
*
tsTelemUri
=
"/report"
;
char
*
tsTelemUri
=
"/report"
;
...
...
source/dnode/mgmt/exe/dmMain.c
浏览文件 @
1975e945
...
@@ -87,18 +87,6 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) {
...
@@ -87,18 +87,6 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) {
}
}
void
dmLogCrash
(
int
signum
,
void
*
sigInfo
,
void
*
context
)
{
void
dmLogCrash
(
int
signum
,
void
*
sigInfo
,
void
*
context
)
{
taosIgnSignal
(
SIGTERM
);
taosIgnSignal
(
SIGHUP
);
taosIgnSignal
(
SIGINT
);
taosIgnSignal
(
SIGBREAK
);
#ifndef WINDOWS
taosIgnSignal
(
SIGBUS
);
#endif
taosIgnSignal
(
SIGABRT
);
taosIgnSignal
(
SIGFPE
);
taosIgnSignal
(
SIGSEGV
);
char
*
pMsg
=
NULL
;
char
*
pMsg
=
NULL
;
const
char
*
flags
=
"UTL FATAL "
;
const
char
*
flags
=
"UTL FATAL "
;
ELogLevel
level
=
DEBUG_FATAL
;
ELogLevel
level
=
DEBUG_FATAL
;
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
1975e945
...
@@ -256,10 +256,13 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
...
@@ -256,10 +256,13 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
snprintf
(
db
,
TSDB_DB_FNAME_LEN
,
"%d%s%s"
,
pUser
->
acctId
,
TS_PATH_DELIMITER
,
connReq
.
db
);
snprintf
(
db
,
TSDB_DB_FNAME_LEN
,
"%d%s%s"
,
pUser
->
acctId
,
TS_PATH_DELIMITER
,
connReq
.
db
);
pDb
=
mndAcquireDb
(
pMnode
,
db
);
pDb
=
mndAcquireDb
(
pMnode
,
db
);
if
(
pDb
==
NULL
)
{
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_INVALID_DB
;
if
(
0
!=
strcmp
(
connReq
.
db
,
TSDB_INFORMATION_SCHEMA_DB
)
&&
mGError
(
"user:%s, failed to login from %s while use db:%s since %s"
,
pReq
->
info
.
conn
.
user
,
ip
,
connReq
.
db
,
(
0
!=
strcmp
(
connReq
.
db
,
TSDB_PERFORMANCE_SCHEMA_DB
)))
{
terrstr
());
terrno
=
TSDB_CODE_MND_INVALID_DB
;
goto
_OVER
;
mGError
(
"user:%s, failed to login from %s while use db:%s since %s"
,
pReq
->
info
.
conn
.
user
,
ip
,
connReq
.
db
,
terrstr
());
goto
_OVER
;
}
}
}
if
(
mndCheckDbPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_READ_OR_WRITE_DB
,
pDb
)
!=
0
)
{
if
(
mndCheckDbPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_READ_OR_WRITE_DB
,
pDb
)
!=
0
)
{
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
1975e945
...
@@ -101,6 +101,8 @@ typedef struct {
...
@@ -101,6 +101,8 @@ typedef struct {
STqPushHandle
pushHandle
;
// push
STqPushHandle
pushHandle
;
// push
STqExecHandle
execHandle
;
// exec
STqExecHandle
execHandle
;
// exec
SRpcMsg
*
msg
;
SRpcMsg
*
msg
;
int32_t
noDataPollCnt
;
int8_t
exec
;
}
STqHandle
;
}
STqHandle
;
typedef
struct
{
typedef
struct
{
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
1975e945
...
@@ -55,6 +55,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
...
@@ -55,6 +55,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy
(
pHandle
->
msg
,
pMsg
,
sizeof
(
SRpcMsg
));
memcpy
(
pHandle
->
msg
,
pMsg
,
sizeof
(
SRpcMsg
));
pHandle
->
msg
->
pCont
=
rpcMallocCont
(
pMsg
->
contLen
);
pHandle
->
msg
->
pCont
=
rpcMallocCont
(
pMsg
->
contLen
);
}
else
{
}
else
{
tqPushDataRsp
(
pTq
,
pHandle
);
void
*
tmp
=
pHandle
->
msg
->
pCont
;
void
*
tmp
=
pHandle
->
msg
->
pCont
;
memcpy
(
pHandle
->
msg
,
pMsg
,
sizeof
(
SRpcMsg
));
memcpy
(
pHandle
->
msg
,
pMsg
,
sizeof
(
SRpcMsg
));
pHandle
->
msg
->
pCont
=
tmp
;
pHandle
->
msg
->
pCont
=
tmp
;
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
1975e945
...
@@ -16,6 +16,7 @@
...
@@ -16,6 +16,7 @@
#include "tq.h"
#include "tq.h"
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
#define NO_POLL_CNT 5
static
int32_t
tqSendMetaPollRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqMetaRsp
*
pRsp
);
static
int32_t
tqSendMetaPollRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqMetaRsp
*
pRsp
);
...
@@ -161,6 +162,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
...
@@ -161,6 +162,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return
0
;
return
0
;
}
}
static
bool
isHandleExecuting
(
STqHandle
*
pHandle
){
return
1
==
atomic_load_8
(
&
pHandle
->
exec
);
}
static
int32_t
extractDataAndRspForNormalSubscribe
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
static
int32_t
extractDataAndRspForNormalSubscribe
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
pOffset
)
{
SRpcMsg
*
pMsg
,
STqOffsetVal
*
pOffset
)
{
uint64_t
consumerId
=
pRequest
->
consumerId
;
uint64_t
consumerId
=
pRequest
->
consumerId
;
...
@@ -176,6 +181,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -176,6 +181,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
return
code
;
return
code
;
}
}
while
(
isHandleExecuting
(
pHandle
)){
tqInfo
(
"sub is executing, pHandle:%p"
,
pHandle
);
taosMsleep
(
5
);
}
atomic_store_8
(
&
pHandle
->
exec
,
1
);
qSetTaskId
(
pHandle
->
execHandle
.
task
,
consumerId
,
pRequest
->
reqId
);
qSetTaskId
(
pHandle
->
execHandle
.
task
,
consumerId
,
pRequest
->
reqId
);
code
=
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
pOffset
);
code
=
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
pOffset
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
...
@@ -185,17 +196,23 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -185,17 +196,23 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if
(
dataRsp
.
blockNum
==
0
&&
dataRsp
.
reqOffset
.
type
==
TMQ_OFFSET__LOG
&&
if
(
dataRsp
.
blockNum
==
0
&&
dataRsp
.
reqOffset
.
type
==
TMQ_OFFSET__LOG
&&
dataRsp
.
reqOffset
.
version
==
dataRsp
.
rspOffset
.
version
&&
pHandle
->
consumerId
==
pRequest
->
consumerId
)
{
dataRsp
.
reqOffset
.
version
==
dataRsp
.
rspOffset
.
version
&&
pHandle
->
consumerId
==
pRequest
->
consumerId
)
{
// lock
if
(
pHandle
->
noDataPollCnt
>=
NO_POLL_CNT
){
// send poll result to client if no data 5 times to avoid lost data
taosWLockLatch
(
&
pTq
->
lock
);
pHandle
->
noDataPollCnt
=
0
;
code
=
tqRegisterPushHandle
(
pTq
,
pHandle
,
pMsg
);
// lock
taosWUnLockLatch
(
&
pTq
->
lock
);
taosWLockLatch
(
&
pTq
->
lock
);
tDeleteSMqDataRsp
(
&
dataRsp
);
code
=
tqRegisterPushHandle
(
pTq
,
pHandle
,
pMsg
);
return
code
;
taosWUnLockLatch
(
&
pTq
->
lock
);
tDeleteSMqDataRsp
(
&
dataRsp
);
atomic_store_8
(
&
pHandle
->
exec
,
0
);
return
code
;
}
else
{
pHandle
->
noDataPollCnt
++
;
}
}
}
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
dataRsp
,
TMQ_MSG_TYPE__POLL_RSP
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
dataRsp
,
TMQ_MSG_TYPE__POLL_RSP
);
// NOTE: this pHandle->consumerId may have been changed already.
// NOTE: this pHandle->consumerId may have been changed already.
end:
end:
...
@@ -207,12 +224,14 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -207,12 +224,14 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// taosWUnLockLatch(&pTq->lock);
// taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp
(
&
dataRsp
);
tDeleteSMqDataRsp
(
&
dataRsp
);
}
}
atomic_store_8
(
&
pHandle
->
exec
,
0
);
return
code
;
return
code
;
}
}
static
int32_t
extractDataAndRspForDbStbSubscribe
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
offset
)
{
static
int32_t
extractDataAndRspForDbStbSubscribe
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
offset
)
{
int
code
=
0
;
int
code
=
0
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
SWalCkHead
*
pCkHead
=
NULL
;
SWalCkHead
*
pCkHead
=
NULL
;
SMqMetaRsp
metaRsp
=
{
0
};
SMqMetaRsp
metaRsp
=
{
0
};
...
@@ -225,10 +244,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -225,10 +244,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
return
code
;
return
code
;
}
}
while
(
isHandleExecuting
(
pHandle
)){
tqInfo
(
"sub is executing, pHandle:%p"
,
pHandle
);
taosMsleep
(
5
);
}
atomic_store_8
(
&
pHandle
->
exec
,
1
);
if
(
offset
->
type
!=
TMQ_OFFSET__LOG
)
{
if
(
offset
->
type
!=
TMQ_OFFSET__LOG
)
{
if
(
tqScanTaosx
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
offset
)
<
0
)
{
if
(
tqScanTaosx
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
offset
)
<
0
)
{
tDeleteSTaosxRsp
(
&
taosxRsp
)
;
code
=
-
1
;
return
-
1
;
goto
end
;
}
}
if
(
metaRsp
.
metaRspLen
>
0
)
{
if
(
metaRsp
.
metaRspLen
>
0
)
{
...
@@ -236,16 +261,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -236,16 +261,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
" subkey:%s vgId:%d, send meta offset type:%d,uid:%"
PRId64
",ts:%"
PRId64
,
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
" subkey:%s vgId:%d, send meta offset type:%d,uid:%"
PRId64
",ts:%"
PRId64
,
pRequest
->
consumerId
,
pHandle
->
subKey
,
vgId
,
metaRsp
.
rspOffset
.
type
,
metaRsp
.
rspOffset
.
uid
,
metaRsp
.
rspOffset
.
ts
);
pRequest
->
consumerId
,
pHandle
->
subKey
,
vgId
,
metaRsp
.
rspOffset
.
type
,
metaRsp
.
rspOffset
.
uid
,
metaRsp
.
rspOffset
.
ts
);
taosMemoryFree
(
metaRsp
.
metaRsp
);
taosMemoryFree
(
metaRsp
.
metaRsp
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
goto
end
;
return
code
;
}
}
tqDebug
(
"taosx poll: consumer:0x%"
PRIx64
" subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%"
PRId64
tqDebug
(
"taosx poll: consumer:0x%"
PRIx64
" subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%"
PRId64
",ts:%"
PRId64
,
pRequest
->
consumerId
,
pHandle
->
subKey
,
vgId
,
taosxRsp
.
blockNum
,
taosxRsp
.
rspOffset
.
type
,
taosxRsp
.
rspOffset
.
uid
,
taosxRsp
.
rspOffset
.
ts
);
",ts:%"
PRId64
,
pRequest
->
consumerId
,
pHandle
->
subKey
,
vgId
,
taosxRsp
.
blockNum
,
taosxRsp
.
rspOffset
.
type
,
taosxRsp
.
rspOffset
.
uid
,
taosxRsp
.
rspOffset
.
ts
);
if
(
taosxRsp
.
blockNum
>
0
)
{
if
(
taosxRsp
.
blockNum
>
0
)
{
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
goto
end
;
return
code
;
}
else
{
}
else
{
*
offset
=
taosxRsp
.
rspOffset
;
*
offset
=
taosxRsp
.
rspOffset
;
}
}
...
@@ -257,9 +280,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -257,9 +280,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
int64_t
fetchVer
=
offset
->
version
+
1
;
int64_t
fetchVer
=
offset
->
version
+
1
;
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
if
(
pCkHead
==
NULL
)
{
tDeleteSTaosxRsp
(
&
taosxRsp
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
code
=
-
1
;
goto
end
;
}
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
int
totalRows
=
0
;
int
totalRows
=
0
;
...
@@ -274,9 +297,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -274,9 +297,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
&
pCkHead
,
pRequest
->
reqId
)
<
0
)
{
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
&
pCkHead
,
pRequest
->
reqId
)
<
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
goto
end
;
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
}
SWalCont
*
pHead
=
&
pCkHead
->
head
;
SWalCont
*
pHead
=
&
pCkHead
->
head
;
...
@@ -288,9 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -288,9 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
totalRows
>
0
)
{
if
(
totalRows
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
-
1
);
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
-
1
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
goto
end
;
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
}
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%s"
,
pHead
->
version
,
TMSG_INFO
(
pHead
->
msgType
));
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%s"
,
pHead
->
version
,
TMSG_INFO
(
pHead
->
msgType
));
...
@@ -298,17 +317,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -298,17 +317,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
metaRsp
.
resMsgType
=
pHead
->
msgType
;
metaRsp
.
resMsgType
=
pHead
->
msgType
;
metaRsp
.
metaRspLen
=
pHead
->
bodyLen
;
metaRsp
.
metaRspLen
=
pHead
->
bodyLen
;
metaRsp
.
metaRsp
=
pHead
->
body
;
metaRsp
.
metaRsp
=
pHead
->
body
;
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pRequest
,
&
metaRsp
)
<
0
)
{
code
=
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pRequest
,
&
metaRsp
);
code
=
-
1
;
goto
end
;
taosMemoryFreeClear
(
pCkHead
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
code
;
}
code
=
0
;
taosMemoryFreeClear
(
pCkHead
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
code
;
}
}
// process data
// process data
...
@@ -318,29 +328,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
...
@@ -318,29 +328,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
.
ver
=
pHead
->
version
,
.
ver
=
pHead
->
version
,
};
};
if
(
tqTaosxScanLog
(
pTq
,
pHandle
,
submit
,
&
taosxRsp
,
&
totalRows
)
<
0
)
{
code
=
tqTaosxScanLog
(
pTq
,
pHandle
,
submit
,
&
taosxRsp
,
&
totalRows
);
tqError
(
"tmq poll: tqTaosxScanLog error %"
PRId64
", in vgId:%d, subkey %s"
,
pRequest
->
consumerId
,
vgId
,
if
(
code
<
0
)
{
pRequest
->
subKey
);
tqError
(
"tmq poll: tqTaosxScanLog error %"
PRId64
", in vgId:%d, subkey %s"
,
pRequest
->
consumerId
,
vgId
,
pRequest
->
subKey
);
taosMemoryFreeClear
(
pCkHead
);
goto
end
;
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
-
1
;
}
}
if
(
totalRows
>=
4096
||
taosxRsp
.
createTableNum
>
0
)
{
if
(
totalRows
>=
4096
||
taosxRsp
.
createTableNum
>
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__TAOSX_RSP
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
goto
end
;
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
else
{
}
else
{
fetchVer
++
;
fetchVer
++
;
}
}
}
}
}
}
end:
atomic_store_8
(
&
pHandle
->
exec
,
0
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
taosMemoryFreeClear
(
pCkHead
);
taosMemoryFreeClear
(
pCkHead
);
return
0
;
return
code
;
}
}
int32_t
tqExtractDataForMq
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
)
{
int32_t
tqExtractDataForMq
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
const
SMqPollReq
*
pRequest
,
SRpcMsg
*
pMsg
)
{
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
1975e945
...
@@ -1437,12 +1437,12 @@ _return:
...
@@ -1437,12 +1437,12 @@ _return:
SMetaRes
*
pRes
=
taosArrayGet
(
ctx
->
pResList
,
pFetch
->
resIdx
);
SMetaRes
*
pRes
=
taosArrayGet
(
ctx
->
pResList
,
pFetch
->
resIdx
);
pRes
->
code
=
code
;
pRes
->
code
=
code
;
pRes
->
pRes
=
NULL
;
pRes
->
pRes
=
NULL
;
ctgTaskError
(
"Get table %d.%s.%s meta failed with error %s"
,
pName
->
acctId
,
pName
->
dbname
,
pName
->
tname
,
tstrerror
(
code
));
if
(
0
==
atomic_sub_fetch_32
(
&
ctx
->
fetchNum
,
1
))
{
if
(
0
==
atomic_sub_fetch_32
(
&
ctx
->
fetchNum
,
1
))
{
TSWAP
(
pTask
->
res
,
ctx
->
pResList
);
TSWAP
(
pTask
->
res
,
ctx
->
pResList
);
taskDone
=
true
;
taskDone
=
true
;
}
}
ctgTaskError
(
"Get table %d.%s.%s meta failed with error %s"
,
pName
->
acctId
,
pName
->
dbname
,
pName
->
tname
,
tstrerror
(
code
));
}
}
if
(
pTask
->
res
&&
taskDone
)
{
if
(
pTask
->
res
&&
taskDone
)
{
...
...
source/libs/executor/src/executil.c
浏览文件 @
1975e945
...
@@ -1484,14 +1484,23 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
...
@@ -1484,14 +1484,23 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
SHashObj
*
pSelectFuncs
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
const
char
*
pName
=
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
;
const
char
*
pName
=
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
;
if
((
strcmp
(
pName
,
"_select_value"
)
==
0
)
||
(
strcmp
(
pName
,
"_group_key"
)
==
0
))
{
if
((
strcmp
(
pName
,
"_select_value"
)
==
0
)
||
(
strcmp
(
pName
,
"_group_key"
)
==
0
))
{
pValCtx
[
num
++
]
=
&
pCtx
[
i
];
pValCtx
[
num
++
]
=
&
pCtx
[
i
];
}
else
if
(
fmIsSelectFunc
(
pCtx
[
i
].
functionId
))
{
}
else
if
(
fmIsSelectFunc
(
pCtx
[
i
].
functionId
))
{
p
=
&
pCtx
[
i
];
void
*
data
=
taosHashGet
(
pSelectFuncs
,
pName
,
strlen
(
pName
));
if
(
taosHashGetSize
(
pSelectFuncs
)
!=
0
&&
data
==
NULL
)
{
p
=
NULL
;
break
;
}
else
{
taosHashPut
(
pSelectFuncs
,
pName
,
strlen
(
pName
),
&
num
,
sizeof
(
num
));
p
=
&
pCtx
[
i
];
}
}
}
}
}
taosHashCleanup
(
pSelectFuncs
);
if
(
p
!=
NULL
)
{
if
(
p
!=
NULL
)
{
p
->
subsidiaries
.
pCtx
=
pValCtx
;
p
->
subsidiaries
.
pCtx
=
pValCtx
;
...
...
source/libs/parser/src/parCalcConst.c
浏览文件 @
1975e945
...
@@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro
...
@@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro
}
}
static
int32_t
calcConstSetOpProjections
(
SCalcConstContext
*
pCxt
,
SSetOperator
*
pSetOp
,
bool
subquery
)
{
static
int32_t
calcConstSetOpProjections
(
SCalcConstContext
*
pCxt
,
SSetOperator
*
pSetOp
,
bool
subquery
)
{
if
(
subquery
&&
pSetOp
->
opType
==
SET_OP_TYPE_UNION
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
index
=
0
;
int32_t
index
=
0
;
SNode
*
pProj
=
NULL
;
SNode
*
pProj
=
NULL
;
WHERE_EACH
(
pProj
,
pSetOp
->
pProjectionList
)
{
WHERE_EACH
(
pProj
,
pSetOp
->
pProjectionList
)
{
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
1975e945
...
@@ -5330,7 +5330,8 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
...
@@ -5330,7 +5330,8 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
}
}
if
(
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
==
pStmt
->
alterType
)
{
if
(
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
==
pStmt
->
alterType
)
{
if
(
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_FIELD_LEN
)
{
if
((
TSDB_DATA_TYPE_VARCHAR
==
pStmt
->
dataType
.
type
&&
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_BINARY_LEN
)
||
(
TSDB_DATA_TYPE_NCHAR
==
pStmt
->
dataType
.
type
&&
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_NCHAR_LEN
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
);
}
}
...
@@ -5355,6 +5356,11 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
...
@@ -5355,6 +5356,11 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_TOO_MANY_COLUMNS
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_TOO_MANY_COLUMNS
);
}
}
if
((
TSDB_DATA_TYPE_VARCHAR
==
pStmt
->
dataType
.
type
&&
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_BINARY_LEN
)
||
(
TSDB_DATA_TYPE_NCHAR
==
pStmt
->
dataType
.
type
&&
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_NCHAR_LEN
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
);
}
if
(
pTableMeta
->
tableInfo
.
rowSize
+
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_BYTES_PER_ROW
)
{
if
(
pTableMeta
->
tableInfo
.
rowSize
+
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_BYTES_PER_ROW
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ROW_LENGTH
,
TSDB_MAX_BYTES_PER_ROW
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ROW_LENGTH
,
TSDB_MAX_BYTES_PER_ROW
);
}
}
...
@@ -8359,6 +8365,11 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
...
@@ -8359,6 +8365,11 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_MODIFY_COL
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_MODIFY_COL
);
}
}
if
((
TSDB_DATA_TYPE_VARCHAR
==
pStmt
->
dataType
.
type
&&
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_BINARY_LEN
)
||
(
TSDB_DATA_TYPE_NCHAR
==
pStmt
->
dataType
.
type
&&
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_NCHAR_LEN
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
);
}
if
(
pTableMeta
->
tableInfo
.
rowSize
+
pReq
->
colModBytes
-
pSchema
->
bytes
>
TSDB_MAX_BYTES_PER_ROW
)
{
if
(
pTableMeta
->
tableInfo
.
rowSize
+
pReq
->
colModBytes
-
pSchema
->
bytes
>
TSDB_MAX_BYTES_PER_ROW
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ROW_LENGTH
,
TSDB_MAX_BYTES_PER_ROW
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ROW_LENGTH
,
TSDB_MAX_BYTES_PER_ROW
);
}
}
...
...
source/libs/planner/src/planner.c
浏览文件 @
1975e945
...
@@ -19,6 +19,14 @@
...
@@ -19,6 +19,14 @@
#include "scalar.h"
#include "scalar.h"
#include "tglobal.h"
#include "tglobal.h"
static
void
debugPrintNode
(
SNode
*
pNode
)
{
char
*
pStr
=
NULL
;
nodesNodeToString
(
pNode
,
false
,
&
pStr
,
NULL
);
printf
(
"%s
\n
"
,
pStr
);
taosMemoryFree
(
pStr
);
return
;
}
static
void
dumpQueryPlan
(
SQueryPlan
*
pPlan
)
{
static
void
dumpQueryPlan
(
SQueryPlan
*
pPlan
)
{
if
(
!
tsQueryPlannerTrace
)
{
if
(
!
tsQueryPlannerTrace
)
{
return
;
return
;
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
1975e945
...
@@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) {
...
@@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) {
wInfo
(
"vgId:%d, reset commitVer to %"
PRId64
,
pWal
->
cfg
.
vgId
,
pWal
->
vers
.
commitVer
);
wInfo
(
"vgId:%d, reset commitVer to %"
PRId64
,
pWal
->
cfg
.
vgId
,
pWal
->
vers
.
commitVer
);
}
}
int
walRepairLogFileTs
(
SWal
*
pWal
,
bool
*
updateMeta
)
{
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
int32_t
fileIdx
=
-
1
;
int32_t
lastCloseTs
=
0
;
char
fnameStr
[
WAL_FILE_LEN
]
=
{
0
};
while
(
++
fileIdx
<
sz
-
1
)
{
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
fileIdx
);
if
(
pFileInfo
->
closeTs
!=
-
1
)
{
lastCloseTs
=
pFileInfo
->
closeTs
;
continue
;
}
walBuildLogName
(
pWal
,
pFileInfo
->
firstVer
,
fnameStr
);
int32_t
mtime
=
0
;
if
(
taosStatFile
(
fnameStr
,
NULL
,
&
mtime
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, failed to stat file due to %s, file:%s"
,
pWal
->
cfg
.
vgId
,
strerror
(
errno
),
fnameStr
);
return
-
1
;
}
if
(
updateMeta
!=
NULL
)
*
updateMeta
=
true
;
if
(
pFileInfo
->
createTs
==
-
1
)
pFileInfo
->
createTs
=
lastCloseTs
;
pFileInfo
->
closeTs
=
mtime
;
lastCloseTs
=
pFileInfo
->
closeTs
;
}
return
0
;
}
bool
walLogEntriesComplete
(
const
SWal
*
pWal
)
{
bool
walLogEntriesComplete
(
const
SWal
*
pWal
)
{
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
bool
complete
=
true
;
bool
complete
=
true
;
...
@@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
...
@@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
wError
(
"failed to scan wal last ver since %s"
,
terrstr
());
wError
(
"failed to scan wal last ver since %s"
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
// remove the empty wal log, and its idx
// empty log file
wInfo
(
"vgId:%d, wal remove empty file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
lastVer
=
pFileInfo
->
firstVer
-
1
;
taosRemoveFile
(
fnameStr
);
walBuildIdxName
(
pWal
,
pFileInfo
->
firstVer
,
fnameStr
);
wInfo
(
"vgId:%d, wal remove empty file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
taosRemoveFile
(
fnameStr
);
// remove its meta entry
taosArrayRemove
(
pWal
->
fileInfoSet
,
fileIdx
);
continue
;
}
}
// update lastVer
// update lastVer
...
@@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
...
@@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
}
(
void
)
walAlignVersions
(
pWal
);
(
void
)
walAlignVersions
(
pWal
);
// repair ts of files
if
(
walRepairLogFileTs
(
pWal
,
&
updateMeta
)
<
0
)
{
return
-
1
;
}
// update meta file
// update meta file
if
(
updateMeta
)
{
if
(
updateMeta
)
{
(
void
)
walSaveMeta
(
pWal
);
(
void
)
walSaveMeta
(
pWal
);
...
...
source/libs/wal/src/walRead.c
浏览文件 @
1975e945
...
@@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) {
...
@@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t
lastVer
=
walGetLastVer
(
pReader
->
pWal
);
int64_t
lastVer
=
walGetLastVer
(
pReader
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pReader
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pReader
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pReader
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pReader
->
pWal
);
while
(
appliedVer
<
committedVer
){
// wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
if
(
appliedVer
<
committedVer
){
// wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug
(
"vgId:%d, wal apply ver:%"
PRId64
" smaller than commit ver:%"
PRId64
", so sleep 1ms"
,
pReader
->
pWal
->
cfg
.
vgId
,
appliedVer
,
committedVer
);
wDebug
(
"vgId:%d, wal apply ver:%"
PRId64
" smaller than commit ver:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
appliedVer
,
committedVer
);
taosMsleep
(
1
);
// taosMsleep(10);
appliedVer
=
walGetAppliedVer
(
pReader
->
pWal
);
}
}
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
// endVer = TMIN(appliedVer, en
dVer);
int64_t
endVer
=
TMIN
(
appliedVer
,
committe
dVer
);
wDebug
(
"vgId:%d, wal start to fetch, index:%"
PRId64
", last index:%"
PRId64
" commit index:%"
PRId64
wDebug
(
"vgId:%d, wal start to fetch, index:%"
PRId64
", last index:%"
PRId64
" commit index:%"
PRId64
", applied index:%"
PRId64
,
", applied index:%"
PRId64
", end index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
);
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
,
endVer
);
while
(
fetchVer
<=
committe
dVer
)
{
while
(
fetchVer
<=
en
dVer
)
{
if
(
walFetchHeadNew
(
pReader
,
fetchVer
)
<
0
)
{
if
(
walFetchHeadNew
(
pReader
,
fetchVer
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
1975e945
...
@@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) {
...
@@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) {
if
(
ver
==
-
1
)
{
if
(
ver
==
-
1
)
{
code
=
-
1
;
code
=
-
1
;
goto
END
;
goto
END
;
}
;
}
pWal
->
vers
.
snapshotVer
=
ver
;
pWal
->
vers
.
snapshotVer
=
ver
;
int
ts
=
taosGetTimestampSec
();
int
ts
=
taosGetTimestampSec
();
ver
=
TMAX
(
ver
-
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
-
1
);
ver
=
TMAX
(
ver
-
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
-
1
);
// compatible mode for refVer
bool
hasTopic
=
false
;
bool
hasTopic
=
false
;
int64_t
refVer
=
ver
;
int64_t
refVer
=
INT64_MAX
;
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
while
(
1
)
{
while
(
1
)
{
pIter
=
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
pIter
=
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
...
@@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) {
...
@@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) {
SWalRef
*
pRef
=
*
(
SWalRef
**
)
pIter
;
SWalRef
*
pRef
=
*
(
SWalRef
**
)
pIter
;
if
(
pRef
->
refVer
==
-
1
)
continue
;
if
(
pRef
->
refVer
==
-
1
)
continue
;
refVer
=
TMIN
(
refVer
,
pRef
->
refVer
-
1
);
refVer
=
TMIN
(
refVer
,
pRef
->
refVer
-
1
);
wDebug
(
"vgId:%d, wal found ref %"
PRId64
", refId %"
PRId64
,
pWal
->
cfg
.
vgId
,
pRef
->
refVer
,
pRef
->
refId
);
hasTopic
=
true
;
hasTopic
=
true
;
}
}
// compatible mode
if
(
pWal
->
cfg
.
retentionPeriod
==
0
&&
hasTopic
)
{
if
(
pWal
->
cfg
.
retentionPeriod
==
0
&&
hasTopic
)
{
wInfo
(
"vgId:%d, wal found refVer:%"
PRId64
" in compatible mode, ver:%"
PRId64
,
pWal
->
cfg
.
vgId
,
refVer
,
ver
);
ver
=
TMIN
(
ver
,
refVer
);
ver
=
TMIN
(
ver
,
refVer
);
}
}
// find files safe to delete
int
deleteCnt
=
0
;
int
deleteCnt
=
0
;
int64_t
newTotSize
=
pWal
->
totSize
;
int64_t
newTotSize
=
pWal
->
totSize
;
SWalFileInfo
tmp
;
SWalFileInfo
tmp
=
{
0
}
;
tmp
.
firstVer
=
ver
;
tmp
.
firstVer
=
ver
;
// find files safe to delete
SWalFileInfo
*
pInfo
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmp
,
compareWalFileInfo
,
TD_LE
);
SWalFileInfo
*
pInfo
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmp
,
compareWalFileInfo
,
TD_LE
);
if
(
pInfo
)
{
if
(
pInfo
)
{
SWalFileInfo
*
pLastFileInfo
=
taosArrayGetLast
(
pWal
->
fileInfoSet
);
wDebug
(
"vgId:%d, wal search found file info. ver:%"
PRId64
", first:%"
PRId64
" last:%"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
wDebug
(
"vgId:%d, wal search found file info: first:%"
PRId64
" last:%"
PRId64
,
pWal
->
cfg
.
vgId
,
pInfo
->
firstVer
,
pInfo
->
firstVer
,
pInfo
->
lastVer
);
pInfo
->
lastVer
);
ASSERT
(
ver
<=
pInfo
->
lastVer
);
if
(
ver
>
=
pInfo
->
lastVer
)
{
if
(
ver
=
=
pInfo
->
lastVer
)
{
pInfo
++
;
pInfo
++
;
wDebug
(
"vgId:%d, wal remove advance one file: first:%"
PRId64
" last:%"
PRId64
,
pWal
->
cfg
.
vgId
,
pInfo
->
firstVer
,
pInfo
->
lastVer
);
}
if
(
pInfo
<=
pLastFileInfo
)
{
wDebug
(
"vgId:%d, wal end remove for first:%"
PRId64
" last:%"
PRId64
,
pWal
->
cfg
.
vgId
,
pInfo
->
firstVer
,
pInfo
->
lastVer
);
}
else
{
wDebug
(
"vgId:%d, wal no remove"
,
pWal
->
cfg
.
vgId
);
}
}
// iterate files, until the searched result
// iterate files, until the searched result
// delete according to file size or close time
for
(
SWalFileInfo
*
iter
=
pWal
->
fileInfoSet
->
pData
;
iter
<
pInfo
;
iter
++
)
{
for
(
SWalFileInfo
*
iter
=
pWal
->
fileInfoSet
->
pData
;
iter
<
pInfo
;
iter
++
)
{
wDebug
(
"vgId:%d, wal check remove file %"
PRId64
"(file size %"
PRId64
" close ts %"
PRId64
if
((
pWal
->
cfg
.
retentionSize
>
0
&&
newTotSize
>
pWal
->
cfg
.
retentionSize
)
||
"), new tot size %"
PRId64
,
(
pWal
->
cfg
.
retentionPeriod
==
0
||
pWal
->
cfg
.
vgId
,
iter
->
firstVer
,
iter
->
fileSize
,
iter
->
closeTs
,
newTotSize
);
pWal
->
cfg
.
retentionPeriod
>
0
&&
iter
->
closeTs
>=
0
&&
iter
->
closeTs
+
pWal
->
cfg
.
retentionPeriod
<
ts
))
{
if
((
pWal
->
cfg
.
retentionSize
!=
-
1
&&
pWal
->
cfg
.
retentionSize
!=
0
&&
newTotSize
>
pWal
->
cfg
.
retentionSize
)
||
((
pWal
->
cfg
.
retentionPeriod
==
0
)
||
(
pWal
->
cfg
.
retentionPeriod
!=
-
1
&&
iter
->
closeTs
!=
-
1
&&
iter
->
closeTs
+
pWal
->
cfg
.
retentionPeriod
<
ts
)))
{
// delete according to file size or close time
wDebug
(
"vgId:%d, check pass"
,
pWal
->
cfg
.
vgId
);
deleteCnt
++
;
deleteCnt
++
;
newTotSize
-=
iter
->
fileSize
;
newTotSize
-=
iter
->
fileSize
;
taosArrayPush
(
pWal
->
toDeleteFiles
,
iter
);
taosArrayPush
(
pWal
->
toDeleteFiles
,
iter
);
}
}
wDebug
(
"vgId:%d, check not pass"
,
pWal
->
cfg
.
vgId
);
}
}
UPDATE_META:
// make new array, remove files
// make new array, remove files
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
deleteCnt
);
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
deleteCnt
);
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
...
@@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) {
...
@@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal
->
vers
.
firstVer
=
((
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
0
))
->
firstVer
;
pWal
->
vers
.
firstVer
=
((
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
0
))
->
firstVer
;
}
}
}
}
// update meta
pWal
->
writeCur
=
taosArrayGetSize
(
pWal
->
fileInfoSet
)
-
1
;
pWal
->
writeCur
=
taosArrayGetSize
(
pWal
->
fileInfoSet
)
-
1
;
pWal
->
totSize
=
newTotSize
;
pWal
->
totSize
=
newTotSize
;
pWal
->
vers
.
verInSnapshotting
=
-
1
;
pWal
->
vers
.
verInSnapshotting
=
-
1
;
// save snapshot ver, commit ver
code
=
walSaveMeta
(
pWal
);
code
=
walSaveMeta
(
pWal
);
if
(
code
<
0
)
{
if
(
code
<
0
)
{
goto
END
;
goto
END
;
...
@@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) {
...
@@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) {
// delete files
// delete files
deleteCnt
=
taosArrayGetSize
(
pWal
->
toDeleteFiles
);
deleteCnt
=
taosArrayGetSize
(
pWal
->
toDeleteFiles
);
wDebug
(
"vgId:%d, wal should delete %d files"
,
pWal
->
cfg
.
vgId
,
deleteCnt
);
char
fnameStr
[
WAL_FILE_LEN
]
=
{
0
};
char
fnameStr
[
WAL_FILE_LEN
];
pInfo
=
NULL
;
for
(
int
i
=
0
;
i
<
deleteCnt
;
i
++
)
{
for
(
int
i
=
0
;
i
<
deleteCnt
;
i
++
)
{
pInfo
=
taosArrayGet
(
pWal
->
toDeleteFiles
,
i
);
pInfo
=
taosArrayGet
(
pWal
->
toDeleteFiles
,
i
);
walBuildLogName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
walBuildLogName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
wDebug
(
"vgId:%d, wal remove file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
if
(
taosRemoveFile
(
fnameStr
)
<
0
&&
errno
!=
ENOENT
)
{
if
(
taosRemoveFile
(
fnameStr
)
<
0
&&
errno
!=
ENOENT
)
{
wError
(
"vgId:%d, failed to remove log file %s due to %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
,
strerror
(
errno
));
wError
(
"vgId:%d, failed to remove log file %s due to %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
,
strerror
(
errno
));
goto
END
;
goto
END
;
}
}
walBuildIdxName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
walBuildIdxName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
wDebug
(
"vgId:%d, wal remove file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
if
(
taosRemoveFile
(
fnameStr
)
<
0
&&
errno
!=
ENOENT
)
{
if
(
taosRemoveFile
(
fnameStr
)
<
0
&&
errno
!=
ENOENT
)
{
wError
(
"vgId:%d, failed to remove idx file %s due to %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
,
strerror
(
errno
));
wError
(
"vgId:%d, failed to remove idx file %s due to %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
,
strerror
(
errno
));
goto
END
;
goto
END
;
}
}
}
}
if
(
pInfo
!=
NULL
)
{
wInfo
(
"vgId:%d, wal log files recycled. count:%d, until ver:%"
PRId64
", closeTs:%"
PRId64
,
pWal
->
cfg
.
vgId
,
deleteCnt
,
pInfo
->
lastVer
,
pInfo
->
closeTs
);
}
taosArrayClear
(
pWal
->
toDeleteFiles
);
taosArrayClear
(
pWal
->
toDeleteFiles
);
END:
END:
...
...
test1
0 → 100644
浏览文件 @
1975e945
tests/pytest/util/cluster.py
浏览文件 @
1975e945
...
@@ -52,8 +52,9 @@ class ConfigureyCluster:
...
@@ -52,8 +52,9 @@ class ConfigureyCluster:
dnode
.
addExtraCfg
(
"secondEp"
,
f
"
{
hostname
}
:
{
startPort_sec
}
"
)
dnode
.
addExtraCfg
(
"secondEp"
,
f
"
{
hostname
}
:
{
startPort_sec
}
"
)
# configure dnoe of independent mnodes
# configure dnoe of independent mnodes
if
num
<=
self
.
mnodeNums
and
self
.
mnodeNums
!=
0
and
independentMnode
==
True
:
if
num
<=
self
.
mnodeNums
and
self
.
mnodeNums
!=
0
and
independentMnode
==
"True"
:
dnode
.
addExtraCfg
(
"supportVnodes"
,
1024
)
tdLog
.
info
(
"set mnode supportVnodes 0"
)
dnode
.
addExtraCfg
(
"supportVnodes"
,
0
)
# print(dnode)
# print(dnode)
self
.
dnodes
.
append
(
dnode
)
self
.
dnodes
.
append
(
dnode
)
return
self
.
dnodes
return
self
.
dnodes
...
@@ -71,6 +72,7 @@ class ConfigureyCluster:
...
@@ -71,6 +72,7 @@ class ConfigureyCluster:
tdSql
.
init
(
conn
.
cursor
())
tdSql
.
init
(
conn
.
cursor
())
mnodeNums
=
int
(
mnodeNums
)
mnodeNums
=
int
(
mnodeNums
)
for
i
in
range
(
2
,
mnodeNums
+
1
):
for
i
in
range
(
2
,
mnodeNums
+
1
):
tdLog
.
info
(
"create mnode on dnode %d"
%
i
)
tdSql
.
execute
(
" create mnode on dnode %d;"
%
i
)
tdSql
.
execute
(
" create mnode on dnode %d;"
%
i
)
...
...
tests/script/tsim/alter/table.sim
浏览文件 @
1975e945
...
@@ -657,17 +657,34 @@ if $data20 != null then
...
@@ -657,17 +657,34 @@ if $data20 != null then
return -1
return -1
endi
endi
print =============== error
print =============== error
for normal table
sql create table tb2023(ts timestamp, f int);
sql create table tb2023(ts timestamp, f int);
sql_error alter table tb2023 add column v varchar(16375);
sql_error alter table tb2023 add column v varchar(16375);
sql_error alter table tb2023 add column v varchar(16385);
sql_error alter table tb2023 add column v varchar(16385);
sql_error alter table tb2023 add column v varchar(33100);
sql_error alter table tb2023 add column v varchar(33100);
sql alter table tb2023 add column v varchar(16374);
sql alter table tb2023 add column v varchar(16374);
sql_error alter table tb2023 modify column v varchar(16375);
sql desc tb2023
sql desc tb2023
sql alter table tb2023 drop column v
sql alter table tb2023 drop column v
sql_error alter table tb2023 add column v nchar(4094);
sql_error alter table tb2023 add column v nchar(4094);
sql alter table tb2023 add column v nchar(4093);
sql alter table tb2023 add column v nchar(4093);
sql_error alter table tb2023 modify column v nchar(4094);
sql desc tb2023
sql desc tb2023
print =============== error for super table
sql create table stb2023(ts timestamp, f int) tags(t1 int);
sql_error alter table stb2023 add column v varchar(16375);
sql_error alter table stb2023 add column v varchar(16385);
sql_error alter table stb2023 add column v varchar(33100);
sql alter table stb2023 add column v varchar(16374);
sql_error alter table stb2023 modify column v varchar(16375);
sql desc stb2023
sql alter table stb2023 drop column v
sql_error alter table stb2023 add column v nchar(4094);
sql alter table stb2023 add column v nchar(4093);
sql_error alter table stb2023 modify column v nchar(4094);
sql desc stb2023
print ======= over
print ======= over
sql drop database d1
sql drop database d1
sql select * from information_schema.ins_databases
sql select * from information_schema.ins_databases
...
...
tests/script/tsim/parser/alter_column.sim
浏览文件 @
1975e945
...
@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
...
@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0);
sql_error alter table tb modify column c2 binary(0);
sql alter table tb modify column c2 binary(17000);
sql
_error
alter table tb modify column c2 binary(17000);
sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10);
sql_error alter table tb modify column c3 nchar(10);
...
...
tests/script/tsim/query/unionall_as_table.sim
浏览文件 @
1975e945
...
@@ -25,4 +25,21 @@ if $data05 != @0021001@ then
...
@@ -25,4 +25,21 @@ if $data05 != @0021001@ then
return -1
return -1
endi
endi
sql create table st (ts timestamp, f int) tags (t int);
sql insert into ct1 using st tags(1) values(now, 1)(now+1s, 2)
sql insert into ct2 using st tags(2) values(now+2s, 3)(now+3s, 4)
sql select count(*) from (select * from ct1 union all select * from ct2)
if $rows != 1 then
return -1
endi
if $data00 != 4 then
return -1
endi
sql select count(*) from (select * from ct1 union select * from ct2)
if $rows != 1 then
return -1
endi
if $data00 != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/system-test/0-others/user_privilege.py
浏览文件 @
1975e945
...
@@ -29,6 +29,7 @@ class TDTestCase:
...
@@ -29,6 +29,7 @@ class TDTestCase:
self
.
stbname
=
'stb'
self
.
stbname
=
'stb'
self
.
binary_length
=
20
# the length of binary for column_dict
self
.
binary_length
=
20
# the length of binary for column_dict
self
.
nchar_length
=
20
# the length of nchar for column_dict
self
.
nchar_length
=
20
# the length of nchar for column_dict
self
.
dbnames
=
[
'db1'
,
'db2'
]
self
.
column_dict
=
{
self
.
column_dict
=
{
'ts'
:
'timestamp'
,
'ts'
:
'timestamp'
,
'col1'
:
'float'
,
'col1'
:
'float'
,
...
@@ -57,21 +58,25 @@ class TDTestCase:
...
@@ -57,21 +58,25 @@ class TDTestCase:
def
create_user
(
self
):
def
create_user
(
self
):
user_name
=
'test'
user_name
=
'test'
tdSql
.
execute
(
f
'create user
{
user_name
}
pass "test"'
)
tdSql
.
execute
(
f
'create user
{
user_name
}
pass "test"'
)
tdSql
.
execute
(
f
'grant read on db.stb with t2 = "Beijing" to
{
user_name
}
'
)
tdSql
.
execute
(
f
'grant read on
{
self
.
dbnames
[
0
]
}
.
{
self
.
stbname
}
with t2 = "Beijing" to
{
user_name
}
'
)
tdSql
.
execute
(
f
'grant write on
{
self
.
dbnames
[
1
]
}
.
{
self
.
stbname
}
with t1 = 2 to
{
user_name
}
'
)
def
prepare_data
(
self
):
def
prepare_data
(
self
):
tdSql
.
execute
(
self
.
setsql
.
set_create_stable_sql
(
self
.
stbname
,
self
.
column_dict
,
self
.
tag_dict
))
for
db
in
self
.
dbnames
:
for
i
in
range
(
self
.
tbnum
):
tdSql
.
execute
(
f
"create database
{
db
}
"
)
tdSql
.
execute
(
f
'create table
{
self
.
stbname
}
_
{
i
}
using
{
self
.
stbname
}
tags(
{
self
.
tag_list
[
i
]
}
)'
)
tdSql
.
execute
(
f
"use
{
db
}
"
)
for
j
in
self
.
values_list
:
tdSql
.
execute
(
self
.
setsql
.
set_create_stable_sql
(
self
.
stbname
,
self
.
column_dict
,
self
.
tag_dict
))
tdSql
.
execute
(
f
'insert into
{
self
.
stbname
}
_
{
i
}
values(
{
j
}
)'
)
for
i
in
range
(
self
.
tbnum
):
tdSql
.
execute
(
f
'create table
{
self
.
stbname
}
_
{
i
}
using
{
self
.
stbname
}
tags(
{
self
.
tag_list
[
i
]
}
)'
)
for
j
in
self
.
values_list
:
tdSql
.
execute
(
f
'insert into
{
self
.
stbname
}
_
{
i
}
values(
{
j
}
)'
)
def
user_
privilege_check
(
self
):
def
user_
read_privilege_check
(
self
,
dbname
):
testconn
=
taos
.
connect
(
user
=
'test'
,
password
=
'test'
)
testconn
=
taos
.
connect
(
user
=
'test'
,
password
=
'test'
)
expectErrNotOccured
=
False
expectErrNotOccured
=
False
try
:
try
:
sql
=
"select count(*) from db
.stb where t2 = 'Beijing'"
sql
=
f
"select count(*) from
{
dbname
}
.stb where t2 = 'Beijing'"
res
=
testconn
.
query
(
sql
)
res
=
testconn
.
query
(
sql
)
data
=
res
.
fetch_all
()
data
=
res
.
fetch_all
()
count
=
data
[
0
][
0
]
count
=
data
[
0
][
0
]
...
@@ -85,11 +90,30 @@ class TDTestCase:
...
@@ -85,11 +90,30 @@ class TDTestCase:
tdLog
.
exit
(
f
"
{
sql
}
, expect result doesn't match"
)
tdLog
.
exit
(
f
"
{
sql
}
, expect result doesn't match"
)
pass
pass
def
user_write_privilege_check
(
self
,
dbname
):
testconn
=
taos
.
connect
(
user
=
'test'
,
password
=
'test'
)
expectErrNotOccured
=
False
try
:
sql
=
f
"insert into
{
dbname
}
.stb_1 values(now, 1.1, 200, 0.3)"
testconn
.
execute
(
sql
)
except
BaseException
:
expectErrNotOccured
=
True
if
expectErrNotOccured
:
caller
=
inspect
.
getframeinfo
(
inspect
.
stack
()[
1
][
0
])
tdLog
.
exit
(
f
"
{
caller
.
filename
}
(
{
caller
.
lineno
}
) failed: sql:
{
sql
}
, expect error not occured"
)
else
:
pass
def
user_privilege_error_check
(
self
):
def
user_privilege_error_check
(
self
):
testconn
=
taos
.
connect
(
user
=
'test'
,
password
=
'test'
)
testconn
=
taos
.
connect
(
user
=
'test'
,
password
=
'test'
)
expectErrNotOccured
=
False
expectErrNotOccured
=
False
sql_list
=
[
"alter talbe db.stb_1 set t2 = 'Wuhan'"
,
"drop table db.stb_1"
]
sql_list
=
[
f
"alter talbe
{
self
.
dbnames
[
0
]
}
.stb_1 set t2 = 'Wuhan'"
,
f
"insert into
{
self
.
dbnames
[
0
]
}
.stb_1 values(now, 1.1, 200, 0.3)"
,
f
"drop table
{
self
.
dbnames
[
0
]
}
.stb_1"
,
f
"select count(*) from
{
self
.
dbnames
[
1
]
}
.stb"
]
for
sql
in
sql_list
:
for
sql
in
sql_list
:
try
:
try
:
...
@@ -104,11 +128,11 @@ class TDTestCase:
...
@@ -104,11 +128,11 @@ class TDTestCase:
tdLog
.
exit
(
f
"
{
caller
.
filename
}
(
{
caller
.
lineno
}
) failed: sql:
{
sql
}
, expect error not occured"
)
tdLog
.
exit
(
f
"
{
caller
.
filename
}
(
{
caller
.
lineno
}
) failed: sql:
{
sql
}
, expect error not occured"
)
pass
pass
def
run
(
self
):
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepare_data
()
self
.
prepare_data
()
self
.
create_user
()
self
.
create_user
()
self
.
user_privilege_check
()
self
.
user_read_privilege_check
(
self
.
dbnames
[
0
])
self
.
user_write_privilege_check
(
self
.
dbnames
[
1
])
self
.
user_privilege_error_check
()
self
.
user_privilege_error_check
()
def
stop
(
self
):
def
stop
(
self
):
...
...
tests/system-test/6-cluster/clusterCommonCheck.py
浏览文件 @
1975e945
...
@@ -207,7 +207,7 @@ class ClusterComCheck:
...
@@ -207,7 +207,7 @@ class ClusterComCheck:
count
+=
1
count
+=
1
else
:
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
exit
(
"stop mnodes on dnode %d
failed in 10s "
)
tdLog
.
exit
(
f
"stop mnodes on dnode
{
offlineDnodeNo
}
failed in 10s "
)
def
check3mnode2off
(
self
,
mnodeNums
=
3
):
def
check3mnode2off
(
self
,
mnodeNums
=
3
):
count
=
0
count
=
0
...
@@ -226,7 +226,45 @@ class ClusterComCheck:
...
@@ -226,7 +226,45 @@ class ClusterComCheck:
count
+=
1
count
+=
1
else
:
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
exit
(
"stop mnodes on dnode %d failed in 10s "
)
tdLog
.
exit
(
"stop mnodes on dnode 2 or 3 failed in 10s"
)
def
check_vgroups_status
(
self
,
vgroup_numbers
=
2
,
db_replica
=
3
,
count_number
=
10
,
db_name
=
"db"
):
""" check vgroups status in 10s after db vgroups status is changed """
vgroup_numbers
=
int
(
vgroup_numbers
)
self
.
db_replica
=
int
(
db_replica
)
tdLog
.
debug
(
"start to check status of vgroups"
)
count
=
0
last_number
=
vgroup_numbers
-
1
while
count
<
count_number
:
time
.
sleep
(
1
)
tdSql
.
query
(
f
"show
{
db_name
}
.vgroups;"
)
if
count
==
0
:
if
tdSql
.
checkRows
(
vgroup_numbers
)
:
tdLog
.
success
(
f
"
{
db_name
}
has
{
vgroup_numbers
}
vgroups"
)
else
:
tdLog
.
exit
(
f
"vgroup number of
{
db_name
}
is not correct"
)
if
self
.
db_replica
==
1
:
if
tdSql
.
queryResult
[
0
][
4
]
==
'leader'
and
tdSql
.
queryResult
[
1
][
4
]
==
'leader'
and
tdSql
.
queryResult
[
last_number
][
4
]
==
'leader'
:
ready_time
=
(
count
+
1
)
tdLog
.
success
(
f
"all vgroups of
{
db_name
}
are leaders in
{
count
+
1
}
s"
)
return
True
count
+=
1
elif
self
.
db_replica
==
3
:
vgroup_status_first
=
[
tdSql
.
queryResult
[
0
][
4
],
tdSql
.
queryResult
[
0
][
6
],
tdSql
.
queryResult
[
0
][
8
]]
vgroup_status_last
=
[
tdSql
.
queryResult
[
last_number
][
4
],
tdSql
.
queryResult
[
last_number
][
6
],
tdSql
.
queryResult
[
last_number
][
8
]]
if
vgroup_status_first
.
count
(
'leader'
)
==
1
and
vgroup_status_first
.
count
(
'follower'
)
==
2
:
if
vgroup_status_last
.
count
(
'leader'
)
==
1
and
vgroup_status_last
.
count
(
'follower'
)
==
2
:
ready_time
=
(
count
+
1
)
tdLog
.
success
(
f
"all vgroups of
{
db_name
}
are ready in
{
ready_time
}
s"
)
return
True
count
+=
1
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
notice
(
f
"all vgroups leader of
{
db_name
}
is selected
{
count
}
s "
)
caller
=
inspect
.
getframeinfo
(
inspect
.
stack
()[
1
][
0
])
args
=
(
caller
.
filename
,
caller
.
lineno
)
tdLog
.
exit
(
"%s(%d) failed "
%
args
)
...
...
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py
0 → 100644
浏览文件 @
1975e945
import
taos
import
sys
import
time
import
os
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
TDDnodes
from
util.dnodes
import
TDDnode
from
util.cluster
import
*
sys
.
path
.
append
(
"./6-cluster"
)
from
clusterCommonCreate
import
*
from
clusterCommonCheck
import
clusterComCheck
import
time
import
socket
import
subprocess
from
multiprocessing
import
Process
import
threading
import
time
import
inspect
import
ctypes
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
_async_raise
(
self
,
tid
,
exctype
):
"""raises the exception, performs cleanup if needed"""
if
not
inspect
.
isclass
(
exctype
):
exctype
=
type
(
exctype
)
res
=
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
ctypes
.
py_object
(
exctype
))
if
res
==
0
:
raise
ValueError
(
"invalid thread id"
)
elif
res
!=
1
:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
None
)
raise
SystemError
(
"PyThreadState_SetAsyncExc failed"
)
def
stopThread
(
self
,
thread
):
self
.
_async_raise
(
thread
.
ident
,
SystemExit
)
def
fiveDnodeThreeMnode
(
self
,
dnodeNumbers
,
mnodeNums
,
restartNumbers
,
stopRole
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db0_0'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb'
,
'stbNumbers'
:
2
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
200
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
"rowsPerTbl"
:
1000
,
"batchNum"
:
5000
}
dnodeNumbers
=
int
(
dnodeNumbers
)
mnodeNums
=
int
(
mnodeNums
)
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allctbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
paraDict
[
"ctbNum"
])
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
tdSql
.
checkData
(
4
,
1
,
'%s:6430'
%
self
.
host
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
#check mnode status
tdLog
.
info
(
"check mnode status"
)
clusterComCheck
.
checkMnodeStatus
(
mnodeNums
)
# add some error operations and
tdLog
.
info
(
"Confirm the status of the dnode again"
)
tdSql
.
error
(
"create mnode on dnode 2"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
print
(
tdSql
.
queryResult
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
# create database and stable
clusterComCreate
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
paraDict
[
"vgroups"
],
paraDict
[
'replica'
])
tdLog
.
info
(
"Take turns stopping Mnodes "
)
tdDnodes
=
cluster
.
dnodes
stopcount
=
0
threads
=
[]
# create stable:stb_0
stableName
=
paraDict
[
'stbName'
]
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_stables
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
'stbNumbers'
])
#create child table:ctb_0
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_ctable
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
stableName
,
paraDict
[
'ctbNum'
])
#insert date
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
threads
.
append
(
threading
.
Thread
(
target
=
clusterComCreate
.
insert_data
,
args
=
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])))
for
tr
in
threads
:
tr
.
start
()
for
tr
in
threads
:
tr
.
join
()
while
stopcount
<
restartNumbers
:
tdLog
.
info
(
" restart loop: %d"
%
stopcount
)
if
stopRole
==
"mnode"
:
for
i
in
range
(
mnodeNums
):
tdDnodes
[
i
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"vnode"
:
for
i
in
range
(
vnodeNumbers
):
tdDnodes
[
i
+
mnodeNums
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
+
mnodeNums
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"dnode"
:
for
i
in
range
(
dnodeNumbers
):
if
i
==
0
:
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
0
)
newTdSql
=
tdCom
.
newTdSql
()
# newTdSql.execute('alter database db0_0 replica 3')
clusterComCreate
.
alterStbMetaData
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tdDnodes
[
i
].
stoptaosd
()
clusterComCheck
.
checkDbRows
(
dbNumbers
)
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
if
i
==
3
:
TdSqlEx
=
tdCom
.
newTdSql
()
tdLog
.
info
(
"alter database db0_0 replica 3"
)
TdSqlEx
.
execute
(
'alter database db0_0 replica 3'
)
# dnodeNumbers don't include database of schema
if
clusterComCheck
.
checkDnodes
(
dnodeNumbers
):
tdLog
.
info
(
"123"
)
else
:
print
(
"456"
)
self
.
stopThread
(
threads
)
tdLog
.
exit
(
"one or more of dnodes failed to start "
)
# self.check3mnode()
stopcount
+=
1
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
clusterComCheck
.
checkDbRows
(
dbNumbers
)
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
# tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql
.
query
(
"show %s.stables"
%
(
paraDict
[
"dbName"
]))
tdSql
.
checkRows
(
paraDict
[
"stbNumbers"
])
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s.%s_%d'
%
(
paraDict
[
"dbName"
],
paraDict
[
'stbName'
],
i
)
tdSql
.
query
(
"select count(*) from %s"
%
stableName
)
if
i
==
0
:
tdSql
.
checkData
(
0
,
0
,
rowsPerStb
*
2
)
else
:
tdSql
.
checkData
(
0
,
0
,
rowsPerStb
)
clusterComCheck
.
check_vgroups_status
(
vgroup_numbers
=
paraDict
[
"vgroups"
],
db_replica
=
3
,
db_name
=
paraDict
[
"dbName"
],
count_number
=
150
)
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
1
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py
0 → 100644
浏览文件 @
1975e945
import
taos
import
sys
import
time
import
os
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
TDDnodes
from
util.dnodes
import
TDDnode
from
util.cluster
import
*
sys
.
path
.
append
(
"./6-cluster"
)
from
clusterCommonCreate
import
*
from
clusterCommonCheck
import
clusterComCheck
import
time
import
socket
import
subprocess
from
multiprocessing
import
Process
import
threading
import
time
import
inspect
import
ctypes
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
_async_raise
(
self
,
tid
,
exctype
):
"""raises the exception, performs cleanup if needed"""
if
not
inspect
.
isclass
(
exctype
):
exctype
=
type
(
exctype
)
res
=
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
ctypes
.
py_object
(
exctype
))
if
res
==
0
:
raise
ValueError
(
"invalid thread id"
)
elif
res
!=
1
:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
None
)
raise
SystemError
(
"PyThreadState_SetAsyncExc failed"
)
def
stopThread
(
self
,
thread
):
self
.
_async_raise
(
thread
.
ident
,
SystemExit
)
def
fiveDnodeThreeMnode
(
self
,
dnodeNumbers
,
mnodeNums
,
restartNumbers
,
stopRole
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db0_0'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
3
,
'stbName'
:
'stb'
,
'stbNumbers'
:
2
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
200
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
"rowsPerTbl"
:
1000
,
"batchNum"
:
5000
}
dnodeNumbers
=
int
(
dnodeNumbers
)
mnodeNums
=
int
(
mnodeNums
)
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allctbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
paraDict
[
"ctbNum"
])
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
tdSql
.
checkData
(
4
,
1
,
'%s:6430'
%
self
.
host
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
#check mnode status
tdLog
.
info
(
"check mnode status"
)
clusterComCheck
.
checkMnodeStatus
(
mnodeNums
)
# add some error operations and
tdLog
.
info
(
"Confirm the status of the dnode again"
)
tdSql
.
error
(
"create mnode on dnode 2"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
print
(
tdSql
.
queryResult
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
# create database and stable
clusterComCreate
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
paraDict
[
"vgroups"
],
paraDict
[
'replica'
])
tdLog
.
info
(
"Take turns stopping Mnodes "
)
tdDnodes
=
cluster
.
dnodes
stopcount
=
0
threads
=
[]
# create stable:stb_0
stableName
=
paraDict
[
'stbName'
]
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_stables
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
'stbNumbers'
])
#create child table:ctb_0
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_ctable
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
stableName
,
paraDict
[
'ctbNum'
])
#insert date
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
threads
.
append
(
threading
.
Thread
(
target
=
clusterComCreate
.
insert_data
,
args
=
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])))
for
tr
in
threads
:
tr
.
start
()
for
tr
in
threads
:
tr
.
join
()
while
stopcount
<
restartNumbers
:
tdLog
.
info
(
" restart loop: %d"
%
stopcount
)
if
stopRole
==
"mnode"
:
for
i
in
range
(
mnodeNums
):
tdDnodes
[
i
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"vnode"
:
for
i
in
range
(
vnodeNumbers
):
tdDnodes
[
i
+
mnodeNums
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
+
mnodeNums
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"dnode"
:
for
i
in
range
(
dnodeNumbers
):
tdDnodes
[
i
].
stoptaosd
()
clusterComCheck
.
checkDbRows
(
dbNumbers
)
if
i
==
0
:
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
0
)
newTdSql
=
tdCom
.
newTdSql
()
# newTdSql.execute('alter database db0_0 replica 3')
clusterComCreate
.
alterStbMetaData
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
if
i
==
3
:
TdSqlEx
=
tdCom
.
newTdSql
()
tdLog
.
info
(
"alter database db0_0 replica 1"
)
TdSqlEx
.
execute
(
'alter database db0_0 replica 1'
)
# dnodeNumbers don't include database of schema
if
clusterComCheck
.
checkDnodes
(
dnodeNumbers
):
tdLog
.
info
(
"123"
)
else
:
print
(
"456"
)
self
.
stopThread
(
threads
)
tdLog
.
exit
(
"one or more of dnodes failed to start "
)
# self.check3mnode()
stopcount
+=
1
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
clusterComCheck
.
checkDbRows
(
dbNumbers
)
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
# tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql
.
query
(
"show %s.stables"
%
(
paraDict
[
"dbName"
]))
tdSql
.
checkRows
(
paraDict
[
"stbNumbers"
])
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s.%s_%d'
%
(
paraDict
[
"dbName"
],
paraDict
[
'stbName'
],
i
)
tdSql
.
query
(
"select count(*) from %s"
%
stableName
)
if
i
==
0
:
tdSql
.
checkData
(
0
,
0
,
rowsPerStb
*
2
)
else
:
tdSql
.
checkData
(
0
,
0
,
rowsPerStb
)
clusterComCheck
.
check_vgroups_status
(
vgroup_numbers
=
paraDict
[
"vgroups"
],
db_replica
=
1
,
db_name
=
paraDict
[
"dbName"
],
count_number
=
150
)
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
1
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py
0 → 100644
浏览文件 @
1975e945
import
taos
import
sys
import
time
import
os
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
TDDnodes
from
util.dnodes
import
TDDnode
from
util.cluster
import
*
sys
.
path
.
append
(
"./6-cluster"
)
from
clusterCommonCreate
import
*
from
clusterCommonCheck
import
clusterComCheck
import
time
import
socket
import
subprocess
from
multiprocessing
import
Process
import
threading
import
time
import
inspect
import
ctypes
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
_async_raise
(
self
,
tid
,
exctype
):
"""raises the exception, performs cleanup if needed"""
if
not
inspect
.
isclass
(
exctype
):
exctype
=
type
(
exctype
)
res
=
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
ctypes
.
py_object
(
exctype
))
if
res
==
0
:
raise
ValueError
(
"invalid thread id"
)
elif
res
!=
1
:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
None
)
raise
SystemError
(
"PyThreadState_SetAsyncExc failed"
)
def
stopThread
(
self
,
thread
):
self
.
_async_raise
(
thread
.
ident
,
SystemExit
)
def
insertData
(
self
,
countstart
,
countstop
):
# fisrt add data : db\stable\childtable\general table
for
couti
in
range
(
countstart
,
countstop
):
tdLog
.
debug
(
"drop database if exists db%d"
%
couti
)
tdSql
.
execute
(
"drop database if exists db%d"
%
couti
)
print
(
"create database if not exists db%d replica 1 duration 300"
%
couti
)
tdSql
.
execute
(
"create database if not exists db%d replica 1 duration 300"
%
couti
)
tdSql
.
execute
(
"use db%d"
%
couti
)
tdSql
.
execute
(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql
.
execute
(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for
i
in
range
(
4
):
tdSql
.
execute
(
f
'create table ct
{
i
+
1
}
using stb1 tags (
{
i
+
1
}
)'
)
def
fiveDnodeThreeMnode
(
self
,
dnodeNumbers
,
mnodeNums
,
restartNumbers
,
stopRole
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db0_0'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb'
,
'stbNumbers'
:
2
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
1000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
"rowsPerTbl"
:
100
,
"batchNum"
:
5000
}
dnodeNumbers
=
int
(
dnodeNumbers
)
mnodeNums
=
int
(
mnodeNums
)
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allctbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
paraDict
[
"ctbNum"
])
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
replica3
=
3
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
tdSql
.
checkData
(
4
,
1
,
'%s:6430'
%
self
.
host
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
#check mnode status
tdLog
.
info
(
"check mnode status"
)
clusterComCheck
.
checkMnodeStatus
(
mnodeNums
)
# add some error operations and
tdLog
.
info
(
"Confirm the status of the dnode again"
)
tdSql
.
error
(
"create mnode on dnode 2"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
print
(
tdSql
.
queryResult
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
# create database and stable
clusterComCreate
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
paraDict
[
"vgroups"
],
paraDict
[
'replica'
])
tdLog
.
info
(
"Take turns stopping Mnodes "
)
tdDnodes
=
cluster
.
dnodes
stopcount
=
0
threads
=
[]
# create stable:stb_0
stableName
=
paraDict
[
'stbName'
]
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_stables
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
'stbNumbers'
])
#create child table:ctb_0
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_ctable
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
stableName
,
paraDict
[
'ctbNum'
])
#insert date
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
threads
.
append
(
threading
.
Thread
(
target
=
clusterComCreate
.
insert_data
,
args
=
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])))
for
tr
in
threads
:
tr
.
start
()
TdSqlEx
=
tdCom
.
newTdSql
()
tdLog
.
info
(
"alter database db0_0 replica 3"
)
TdSqlEx
.
execute
(
'alter database db0_0 replica 3'
)
while
stopcount
<
restartNumbers
:
tdLog
.
info
(
" restart loop: %d"
%
stopcount
)
if
stopRole
==
"mnode"
:
for
i
in
range
(
mnodeNums
):
tdDnodes
[
i
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"vnode"
:
for
i
in
range
(
vnodeNumbers
):
tdDnodes
[
i
+
mnodeNums
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
+
mnodeNums
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"dnode"
:
for
i
in
range
(
dnodeNumbers
):
tdDnodes
[
i
].
stoptaosd
()
# tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;')
# TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;')
# tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);')
# TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);')
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
# sleep(10)
# dnodeNumbers don't include database of schema
if
clusterComCheck
.
checkDnodes
(
dnodeNumbers
):
tdLog
.
info
(
"123"
)
else
:
print
(
"456"
)
self
.
stopThread
(
threads
)
tdLog
.
exit
(
"one or more of dnodes failed to start "
)
# self.check3mnode()
stopcount
+=
1
for
tr
in
threads
:
tr
.
join
()
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
clusterComCheck
.
checkDbRows
(
dbNumbers
)
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
# tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql
.
query
(
"show %s.stables"
%
(
paraDict
[
"dbName"
]))
tdSql
.
checkRows
(
paraDict
[
"stbNumbers"
])
# for i in range(paraDict['stbNumbers']):
# stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
# tdSql.query("select count(*) from %s"%stableName)
# tdSql.checkData(0,0,rowsPerStb)
clusterComCheck
.
check_vgroups_status
(
vgroup_numbers
=
paraDict
[
"vgroups"
],
db_replica
=
replica3
,
db_name
=
paraDict
[
"dbName"
],
count_number
=
240
)
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
4
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
0 → 100644
浏览文件 @
1975e945
import
taos
import
sys
import
time
import
os
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
TDDnodes
from
util.dnodes
import
TDDnode
from
util.cluster
import
*
sys
.
path
.
append
(
"./6-cluster"
)
from
clusterCommonCreate
import
*
from
clusterCommonCheck
import
clusterComCheck
import
time
import
socket
import
subprocess
from
multiprocessing
import
Process
import
threading
import
time
import
inspect
import
ctypes
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
_async_raise
(
self
,
tid
,
exctype
):
"""raises the exception, performs cleanup if needed"""
if
not
inspect
.
isclass
(
exctype
):
exctype
=
type
(
exctype
)
res
=
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
ctypes
.
py_object
(
exctype
))
if
res
==
0
:
raise
ValueError
(
"invalid thread id"
)
elif
res
!=
1
:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
None
)
raise
SystemError
(
"PyThreadState_SetAsyncExc failed"
)
def
stopThread
(
self
,
thread
):
self
.
_async_raise
(
thread
.
ident
,
SystemExit
)
def
insertData
(
self
,
countstart
,
countstop
):
# fisrt add data : db\stable\childtable\general table
for
couti
in
range
(
countstart
,
countstop
):
tdLog
.
debug
(
"drop database if exists db%d"
%
couti
)
tdSql
.
execute
(
"drop database if exists db%d"
%
couti
)
print
(
"create database if not exists db%d replica 1 duration 300"
%
couti
)
tdSql
.
execute
(
"create database if not exists db%d replica 1 duration 300"
%
couti
)
tdSql
.
execute
(
"use db%d"
%
couti
)
tdSql
.
execute
(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql
.
execute
(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for
i
in
range
(
4
):
tdSql
.
execute
(
f
'create table ct
{
i
+
1
}
using stb1 tags (
{
i
+
1
}
)'
)
def
fiveDnodeThreeMnode
(
self
,
dnodeNumbers
,
mnodeNums
,
restartNumbers
,
stopRole
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db0_0'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
3
,
'stbName'
:
'stb'
,
'stbNumbers'
:
2
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
1
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
"rowsPerTbl"
:
1
,
"batchNum"
:
5000
}
dnodeNumbers
=
int
(
dnodeNumbers
)
mnodeNums
=
int
(
mnodeNums
)
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
replica1
=
1
replica3
=
3
allctbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
paraDict
[
"ctbNum"
])
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
tdSql
.
checkData
(
4
,
1
,
'%s:6430'
%
self
.
host
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
#check mnode status
tdLog
.
info
(
"check mnode status"
)
clusterComCheck
.
checkMnodeStatus
(
mnodeNums
)
# add some error operations and
tdLog
.
info
(
"Confirm the status of the dnode again"
)
tdSql
.
error
(
"create mnode on dnode 2"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
print
(
tdSql
.
queryResult
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
# create database and stable
clusterComCreate
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
paraDict
[
"vgroups"
],
paraDict
[
'replica'
])
tdLog
.
info
(
"Take turns stopping Mnodes "
)
tdDnodes
=
cluster
.
dnodes
stopcount
=
0
threads
=
[]
# create stable:stb_0
stableName
=
paraDict
[
'stbName'
]
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_stables
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
'stbNumbers'
])
#create child table:ctb_0
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_ctable
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
stableName
,
paraDict
[
'ctbNum'
])
#insert date
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
threads
.
append
(
threading
.
Thread
(
target
=
clusterComCreate
.
insert_data
,
args
=
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])))
for
tr
in
threads
:
tr
.
start
()
TdSqlEx
=
tdCom
.
newTdSql
()
tdLog
.
info
(
f
"alter database db0_0 replica
{
replica1
}
"
)
TdSqlEx
.
execute
(
f
'alter database db0_0 replica
{
replica1
}
'
)
for
tr
in
threads
:
tr
.
join
()
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
clusterComCheck
.
checkDbRows
(
dbNumbers
)
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
# tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql
.
query
(
"show %s.stables"
%
(
paraDict
[
"dbName"
]))
tdSql
.
checkRows
(
paraDict
[
"stbNumbers"
])
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s.%s_%d'
%
(
paraDict
[
"dbName"
],
paraDict
[
'stbName'
],
i
)
tdSql
.
query
(
"select count(*) from %s"
%
stableName
)
tdSql
.
checkData
(
0
,
0
,
rowsPerStb
)
clusterComCheck
.
check_vgroups_status
(
vgroup_numbers
=
paraDict
[
"vgroups"
],
db_replica
=
replica1
,
db_name
=
paraDict
[
"dbName"
],
count_number
=
20
)
sleep
(
5
)
tdLog
.
info
(
f
"show transactions;alter database db0_0 replica
{
replica3
}
;"
)
TdSqlEx
.
execute
(
f
'show transactions;'
)
TdSqlEx
.
execute
(
f
'alter database db0_0 replica
{
replica3
}
;'
)
clusterComCheck
.
check_vgroups_status
(
vgroup_numbers
=
paraDict
[
"vgroups"
],
db_replica
=
replica3
,
db_name
=
paraDict
[
"dbName"
],
count_number
=
120
)
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
4
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py
0 → 100644
浏览文件 @
1975e945
import
taos
import
sys
import
time
import
os
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
TDDnodes
from
util.dnodes
import
TDDnode
from
util.cluster
import
*
sys
.
path
.
append
(
"./6-cluster"
)
from
clusterCommonCreate
import
*
from
clusterCommonCheck
import
clusterComCheck
import
time
import
socket
import
subprocess
from
multiprocessing
import
Process
import
threading
import
time
import
inspect
import
ctypes
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
_async_raise
(
self
,
tid
,
exctype
):
"""raises the exception, performs cleanup if needed"""
if
not
inspect
.
isclass
(
exctype
):
exctype
=
type
(
exctype
)
res
=
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
ctypes
.
py_object
(
exctype
))
if
res
==
0
:
raise
ValueError
(
"invalid thread id"
)
elif
res
!=
1
:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes
.
pythonapi
.
PyThreadState_SetAsyncExc
(
tid
,
None
)
raise
SystemError
(
"PyThreadState_SetAsyncExc failed"
)
def
stopThread
(
self
,
thread
):
self
.
_async_raise
(
thread
.
ident
,
SystemExit
)
def
insertData
(
self
,
dbname
,
tableCount
,
rowsPerCount
):
# tableCount : create table number
# rowsPerCount : rows per table
# fisrt add data : db\stable\childtable\general table
os
.
system
(
f
"taosBenchmark -d
{
dbname
}
-n
{
tableCount
}
-t
{
rowsPerCount
}
-z 1 -k 10000 -y "
)
def
fiveDnodeThreeMnode
(
self
,
dnodeNumbers
,
mnodeNums
,
restartNumbers
,
stopRole
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db0_0'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb'
,
'stbNumbers'
:
2
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
10000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
"rowsPerTbl"
:
10000
,
"batchNum"
:
5000
}
dnodeNumbers
=
int
(
dnodeNumbers
)
mnodeNums
=
int
(
mnodeNums
)
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allctbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
paraDict
[
"ctbNum"
])
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
tdSql
.
checkData
(
4
,
1
,
'%s:6430'
%
self
.
host
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
#check mnode status
tdLog
.
info
(
"check mnode status"
)
clusterComCheck
.
checkMnodeStatus
(
mnodeNums
)
# add some error operations and
tdLog
.
info
(
"Confirm the status of the dnode again"
)
tdSql
.
error
(
"create mnode on dnode 2"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
print
(
tdSql
.
queryResult
)
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
# create database and stable
tdLog
.
info
(
"Take turns stopping Mnodes "
)
tdDnodes
=
cluster
.
dnodes
stopcount
=
0
threads
=
[]
# create stable:stb_0
threads
.
append
(
threading
.
Thread
(
target
=
self
.
insertData
,
args
=
(
paraDict
[
"dbName"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
])))
for
tr
in
threads
:
tr
.
start
()
TdSqlEx
=
tdCom
.
newTdSql
()
tdLog
.
info
(
"alter database db0_0 replica 3"
)
TdSqlEx
.
execute
(
'alter database db0_0 replica 3'
)
while
stopcount
<
restartNumbers
:
tdLog
.
info
(
" restart loop: %d"
%
stopcount
)
if
stopRole
==
"mnode"
:
for
i
in
range
(
mnodeNums
):
tdDnodes
[
i
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"vnode"
:
for
i
in
range
(
vnodeNumbers
):
tdDnodes
[
i
+
mnodeNums
].
stoptaosd
()
# sleep(10)
tdDnodes
[
i
+
mnodeNums
].
starttaosd
()
# sleep(10)
elif
stopRole
==
"dnode"
:
for
i
in
range
(
dnodeNumbers
):
tdDnodes
[
i
].
stoptaosd
()
# tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;')
# TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;')
# tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);')
# TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);')
# sleep(10)
tdDnodes
[
i
].
starttaosd
()
# sleep(10)
# dnodeNumbers don't include database of schema
if
clusterComCheck
.
checkDnodes
(
dnodeNumbers
):
tdLog
.
info
(
"123"
)
else
:
print
(
"456"
)
self
.
stopThread
(
threads
)
tdLog
.
exit
(
"one or more of dnodes failed to start "
)
# self.check3mnode()
stopcount
+=
1
for
tr
in
threads
:
tr
.
join
()
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
clusterComCheck
.
checkDbRows
(
dbNumbers
)
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
# tdSql.execute("use %s" %(paraDict["dbName"]))
tdSql
.
query
(
"show %s.stables"
%
(
paraDict
[
"dbName"
]))
tdSql
.
checkRows
(
paraDict
[
"stbNumbers"
])
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s.%s_%d'
%
(
paraDict
[
"dbName"
],
paraDict
[
'stbName'
],
i
)
tdSql
.
query
(
"select count(*) from %s"
%
stableName
)
tdSql
.
checkData
(
0
,
0
,
rowsPerStb
)
clusterComCheck
.
check_vgroups_status
(
vgroup_numbers
=
paraDict
[
"vgroups"
],
db_replica
=
3
,
db_name
=
paraDict
[
"dbName"
],
count_number
=
240
)
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
4
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/subscribeDb3.py
浏览文件 @
1975e945
...
@@ -336,7 +336,7 @@ class TDTestCase:
...
@@ -336,7 +336,7 @@ class TDTestCase:
for
i
in
range
(
expectRows
):
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
totalConsumeRows
+=
resultList
[
i
]
if
totalConsumeRows
>
=
expectrowcnt
or
totalConsumeRows
<=
0
:
if
totalConsumeRows
>
expectrowcnt
or
totalConsumeRows
<=
0
:
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and 0"
%
(
totalConsumeRows
,
expectrowcnt
))
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and 0"
%
(
totalConsumeRows
,
expectrowcnt
))
tdLog
.
exit
(
"tmq consume rows error!"
)
tdLog
.
exit
(
"tmq consume rows error!"
)
...
...
tests/system-test/7-tmq/subscribeStb.py
浏览文件 @
1975e945
...
@@ -226,12 +226,11 @@ class TDTestCase:
...
@@ -226,12 +226,11 @@ class TDTestCase:
self
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
self
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tdLog
.
info
(
"start consume processor"
)
pollDelay
=
5
pollDelay
=
10
showMsg
=
1
showMsg
=
1
showRow
=
1
showRow
=
1
self
.
startTmqSimProcess
(
buildPath
,
cfgPath
,
pollDelay
,
parameterDict
[
"dbName"
],
showMsg
,
showRow
)
self
.
startTmqSimProcess
(
buildPath
,
cfgPath
,
pollDelay
,
parameterDict
[
"dbName"
],
showMsg
,
showRow
)
time
.
sleep
(
5
)
self
.
create_ctables
(
tdSql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
])
self
.
create_ctables
(
tdSql
,
parameterDict
[
"dbName"
],
parameterDict
[
"stbName"
],
parameterDict
[
"ctbNum"
])
self
.
insert_data
(
tdSql
,
\
self
.
insert_data
(
tdSql
,
\
parameterDict
[
"dbName"
],
\
parameterDict
[
"dbName"
],
\
...
@@ -307,7 +306,7 @@ class TDTestCase:
...
@@ -307,7 +306,7 @@ class TDTestCase:
self
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
self
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tdLog
.
info
(
"start consume processor"
)
pollDelay
=
5
pollDelay
=
10
showMsg
=
1
showMsg
=
1
showRow
=
1
showRow
=
1
self
.
startTmqSimProcess
(
buildPath
,
cfgPath
,
pollDelay
,
parameterDict
[
"dbName"
],
showMsg
,
showRow
)
self
.
startTmqSimProcess
(
buildPath
,
cfgPath
,
pollDelay
,
parameterDict
[
"dbName"
],
showMsg
,
showRow
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录