Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
320296a6
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
320296a6
编写于
10月 21, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
[td-225] merge develop
上级
fccb4851
e2ed3f71
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
312 addition
and
146 deletion
+312
-146
Jenkinsfile
Jenkinsfile
+16
-0
documentation20/webdocs/markdowndocs/cluster-ch.md
documentation20/webdocs/markdowndocs/cluster-ch.md
+2
-4
src/balance/src/balance.c
src/balance/src/balance.c
+2
-2
src/client/src/tscSql.c
src/client/src/tscSql.c
+4
-1
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+3
-3
src/connector/python/linux/python2/taos/cursor.py
src/connector/python/linux/python2/taos/cursor.py
+4
-2
src/connector/python/linux/python3/taos/cursor.py
src/connector/python/linux/python3/taos/cursor.py
+4
-2
src/connector/python/windows/python2/taos/cursor.py
src/connector/python/windows/python2/taos/cursor.py
+3
-0
src/connector/python/windows/python3/taos/cursor.py
src/connector/python/windows/python3/taos/cursor.py
+3
-0
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+0
-1
src/inc/vnode.h
src/inc/vnode.h
+2
-0
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+2
-3
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+3
-0
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+74
-41
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+36
-9
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+2
-2
tests/pytest/concurrent_inquiry.py
tests/pytest/concurrent_inquiry.py
+78
-76
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+1
-0
tests/pytest/query/bug1471.py
tests/pytest/query/bug1471.py
+73
-0
未找到文件。
Jenkinsfile
浏览文件 @
320296a6
...
...
@@ -118,6 +118,22 @@ pipeline {
date'''
}
}
stage
(
'connector'
){
agent
{
label
"release"
}
steps
{
sh
'''
cd ${WORKSPACE}
git checkout develop
cd tests/gotest
bash batchtest.sh
cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single >/dev/null
java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
}
}
}
}
...
...
documentation20/webdocs/markdowndocs/cluster-ch.md
浏览文件 @
320296a6
...
...
@@ -213,8 +213,6 @@ SHOW MNODES;
## Arbitrator的使用
如果副本数为偶数,当一个vnode group里一半
或超过一半的vnode不工作时,是无法从中选出master的。同理,一半或超过一半的
mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到arbitrator, 那么节点B就能正常工作。
如果副本数为偶数,当一个vnode group里一半
vnode不工作时,是无法从中选出master的。同理,一半
mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到arbitrator, 那么节点B就能正常工作。
下载最新arbitrator及之前版本的安装包,请点击
[
安装包下载
](
https://www.taosdata.com/cn/all-downloads/
)
,在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。
TDengine Arbitrator安装包里带有一个执行程序tarbitrator, 找任何一台Linux服务器运行它即可。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数
`-p`
可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为arbitrator的End Point。如果该参数配置了,当副本数为偶数数,系统将自动连接配置的arbitrator。
TDengine提供一个执行程序tarbitrator, 找任何一台Linux服务器运行它即可。请点击
[
安装包下载
](
https://www.taosdata.com/cn/all-downloads/
)
,在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数
`-p`
可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为arbitrator的End Point。如果该参数配置了,当副本数为偶数数,系统将自动连接配置的arbitrator。如果副本数为奇数,即使配置了arbitrator, 系统也不会去建立连接。
src/balance/src/balance.c
浏览文件 @
320296a6
...
...
@@ -216,8 +216,8 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
SVnodeGid
*
pVnode
=
pVgroup
->
vnodeGid
+
i
;
if
(
pVnode
==
pRmVnode
)
continue
;
mTrace
(
"vgId:%d, ch
ange vgroup status, dnode:%d status:%d
"
,
pVgroup
->
vgId
,
pVnode
->
pDnode
->
dnodeId
,
pVnode
->
pDnode
->
status
);
mTrace
(
"vgId:%d, ch
eck vgroup status, dnode:%d status:%d, vnode role:%s
"
,
pVgroup
->
vgId
,
pVnode
->
pDnode
->
dnodeId
,
pVnode
->
pDnode
->
status
,
syncRole
[
pVnode
->
role
]
);
if
(
pVnode
->
pDnode
->
status
==
TAOS_DN_STATUS_DROPPING
)
continue
;
if
(
pVnode
->
pDnode
->
status
==
TAOS_DN_STATUS_OFFLINE
)
continue
;
...
...
src/client/src/tscSql.c
浏览文件 @
320296a6
...
...
@@ -604,7 +604,8 @@ static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
return
true
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
if
((
pQueryInfo
==
NULL
)
||
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
return
true
;
}
...
...
@@ -718,6 +719,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
!
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
return
;
}
...
...
@@ -766,6 +768,7 @@ void taos_stop_query(TAOS_RES *res) {
pSql
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
assert
(
pSql
->
pRpcCtx
==
NULL
);
tscKillSTableQuery
(
pSql
);
...
...
src/client/src/tscUtil.c
浏览文件 @
320296a6
...
...
@@ -114,9 +114,9 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
}
// for select query super table, the super table vgroup list can not be null in any cases.
if
(
pQueryInfo
->
command
==
TSDB_SQL_SELECT
&&
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
// assert(pTableMetaInfo->vgroupList != NULL); // if retrieve vgroupInfo failed, the value may be null
}
//
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// assert(pTableMetaInfo->vgroupList != NULL);
//
}
if
((
pQueryInfo
->
type
&
TSDB_QUERY_TYPE_FREE_RESOURCE
)
==
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
return
false
;
...
...
src/connector/python/linux/python2/taos/cursor.py
浏览文件 @
320296a6
...
...
@@ -192,8 +192,10 @@ class TDengineCursor(object):
buffer
=
[[]
for
i
in
range
(
len
(
self
.
_fields
))]
self
.
_rowcount
=
0
while
True
:
block
,
num_of_fields
=
CTaosInterface
.
fetchBlock
(
self
.
_result
,
self
.
_fields
)
block
,
num_of_fields
=
CTaosInterface
.
fetchBlock
(
self
.
_result
,
self
.
_fields
)
errno
=
CTaosInterface
.
libtaos
.
taos_errno
(
self
.
_result
)
if
errno
!=
0
:
raise
ProgrammingError
(
CTaosInterface
.
errStr
(
self
.
_result
),
errno
)
if
num_of_fields
==
0
:
break
self
.
_rowcount
+=
num_of_fields
...
...
src/connector/python/linux/python3/taos/cursor.py
浏览文件 @
320296a6
...
...
@@ -207,8 +207,10 @@ class TDengineCursor(object):
buffer
=
[[]
for
i
in
range
(
len
(
self
.
_fields
))]
self
.
_rowcount
=
0
while
True
:
block
,
num_of_fields
=
CTaosInterface
.
fetchBlock
(
self
.
_result
,
self
.
_fields
)
block
,
num_of_fields
=
CTaosInterface
.
fetchBlock
(
self
.
_result
,
self
.
_fields
)
errno
=
CTaosInterface
.
libtaos
.
taos_errno
(
self
.
_result
)
if
errno
!=
0
:
raise
ProgrammingError
(
CTaosInterface
.
errStr
(
self
.
_result
),
errno
)
if
num_of_fields
==
0
:
break
self
.
_rowcount
+=
num_of_fields
...
...
src/connector/python/windows/python2/taos/cursor.py
浏览文件 @
320296a6
...
...
@@ -142,6 +142,9 @@ class TDengineCursor(object):
self
.
_rowcount
=
0
while
True
:
block
,
num_of_fields
=
CTaosInterface
.
fetchBlock
(
self
.
_result
,
self
.
_fields
)
errno
=
CTaosInterface
.
libtaos
.
taos_errno
(
self
.
_result
)
if
errno
!=
0
:
raise
ProgrammingError
(
CTaosInterface
.
errStr
(
self
.
_result
),
errno
)
if
num_of_fields
==
0
:
break
self
.
_rowcount
+=
num_of_fields
for
i
in
range
(
len
(
self
.
_fields
)):
...
...
src/connector/python/windows/python3/taos/cursor.py
浏览文件 @
320296a6
...
...
@@ -142,6 +142,9 @@ class TDengineCursor(object):
self
.
_rowcount
=
0
while
True
:
block
,
num_of_fields
=
CTaosInterface
.
fetchBlock
(
self
.
_result
,
self
.
_fields
)
errno
=
CTaosInterface
.
libtaos
.
taos_errno
(
self
.
_result
)
if
errno
!=
0
:
raise
ProgrammingError
(
CTaosInterface
.
errStr
(
self
.
_result
),
errno
)
if
num_of_fields
==
0
:
break
self
.
_rowcount
+=
num_of_fields
for
i
in
range
(
len
(
self
.
_fields
)):
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
320296a6
...
...
@@ -189,7 +189,6 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
void
dnodeDispatchNonRspMsg
(
void
*
pVnode
,
SReadMsg
*
pRead
,
int32_t
code
)
{
rpcFreeCont
(
pRead
->
rpcMsg
.
pCont
);
vnodeRelease
(
pVnode
);
return
;
}
static
void
*
dnodeProcessReadQueue
(
void
*
param
)
{
...
...
src/inc/vnode.h
浏览文件 @
320296a6
...
...
@@ -41,6 +41,8 @@ typedef struct {
SRpcMsg
rpcMsg
;
}
SReadMsg
;
extern
char
*
vnodeStatus
[];
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeDrop
(
int32_t
vgId
);
int32_t
vnodeOpen
(
int32_t
vgId
,
char
*
rootDir
);
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
320296a6
...
...
@@ -111,7 +111,6 @@ void mnodeReleaseConn(SConnObj *pConn) {
}
SConnObj
*
mnodeAccquireConn
(
int32_t
connId
,
char
*
user
,
uint32_t
ip
,
uint16_t
port
)
{
uint64_t
expireTime
=
CONN_KEEP_TIME
*
1000
+
(
uint64_t
)
taosGetTimestampMs
();
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
tsMnodeConnCache
,
&
connId
,
sizeof
(
int32_t
));
if
(
pConn
==
NULL
)
{
mDebug
(
"connId:%d, is already destroyed, user:%s ip:%s:%u"
,
connId
,
user
,
taosIpStr
(
ip
),
port
);
...
...
@@ -126,7 +125,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
}
// mDebug("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port);
pConn
->
lastAccess
=
expireTime
;
pConn
->
lastAccess
=
CONN_KEEP_TIME
*
1000
+
(
uint64_t
)
taosGetTimestampMs
()
;
return
pConn
;
}
...
...
@@ -626,7 +625,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SCMKillConnMsg
*
pKill
=
pMsg
->
rpcMsg
.
pCont
;
int32_t
connId
=
atoi
(
pKill
->
queryId
);
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
tsMnodeConnCache
,
&
connId
,
sizeof
(
int32_t
));
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
tsMnodeConnCache
,
&
connId
,
sizeof
(
int32_t
));
if
(
pConn
==
NULL
)
{
mError
(
"connId:%s, failed to kill, conn not exist"
,
pKill
->
queryId
);
return
TSDB_CODE_MND_INVALID_CONN_ID
;
...
...
src/query/src/qExecutor.c
浏览文件 @
320296a6
...
...
@@ -7118,6 +7118,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
void
**
qRegisterQInfo
(
void
*
pMgmt
,
uint64_t
qInfo
)
{
if
(
pMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
...
...
@@ -7126,6 +7127,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
...
...
@@ -7133,6 +7135,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
if
(
pQueryMgmt
->
closed
)
{
// pthread_mutex_unlock(&pQueryMgmt->lock);
qError
(
"QInfo:%p failed to add qhandle into cache, since qMgmt is colsing"
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
TSDB_CACHE_PTR_TYPE
handleVal
=
(
TSDB_CACHE_PTR_TYPE
)
qInfo
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
320296a6
...
...
@@ -56,6 +56,14 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void
syncConfirmForward
(
tsync_h
shandle
,
uint64_t
version
,
int32_t
code
)
{}
#endif
char
*
vnodeStatus
[]
=
{
"init"
,
"ready"
,
"closing"
,
"updating"
,
"reset"
};
int32_t
vnodeInitResources
()
{
int
code
=
syncInit
();
if
(
code
!=
0
)
return
code
;
...
...
@@ -74,6 +82,7 @@ int32_t vnodeInitResources() {
void
vnodeCleanupResources
()
{
if
(
tsDnodeVnodesHash
!=
NULL
)
{
vDebug
(
"vnode list is cleanup"
);
taosHashCleanup
(
tsDnodeVnodesHash
);
tsDnodeVnodesHash
=
NULL
;
}
...
...
@@ -84,9 +93,10 @@ void vnodeCleanupResources() {
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
)
{
int32_t
code
;
SVnodeObj
*
pTemp
=
(
SVnodeObj
*
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
pVnodeCfg
->
cfg
.
vgId
,
sizeof
(
int32_t
));
if
(
pTemp
!=
NULL
)
{
vInfo
(
"vgId:%d, vnode already exist, pVnode:%p"
,
pVnodeCfg
->
cfg
.
vgId
,
pTemp
);
SVnodeObj
*
pVnode
=
vnodeAcquire
(
pVnodeCfg
->
cfg
.
vgId
);
if
(
pVnode
!=
NULL
)
{
vDebug
(
"vgId:%d, vnode already exist, refCount:%d pVnode:%p"
,
pVnodeCfg
->
cfg
.
vgId
,
pVnode
->
refCount
,
pVnode
);
vnodeRelease
(
pVnode
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -143,22 +153,24 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return
TSDB_CODE_VND_INIT_FAILED
;
}
vInfo
(
"vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d"
,
pVnodeCfg
->
cfg
.
vgId
,
pVnodeCfg
->
cfg
.
walLevel
,
pVnodeCfg
->
cfg
.
fsyncPeriod
);
vInfo
(
"vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d"
,
pVnodeCfg
->
cfg
.
vgId
,
pVnodeCfg
->
cfg
.
walLevel
,
pVnodeCfg
->
cfg
.
fsyncPeriod
);
code
=
vnodeOpen
(
pVnodeCfg
->
cfg
.
vgId
,
rootDir
);
return
code
;
}
int32_t
vnodeDrop
(
int32_t
vgId
)
{
SVnodeObj
*
*
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
)
);
if
(
p
pVnode
==
NULL
||
*
pp
Vnode
==
NULL
)
{
vDebug
(
"vgId:%d, failed to drop, v
gId
not find"
,
vgId
);
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
vDebug
(
"vgId:%d, failed to drop, v
node
not find"
,
vgId
);
return
TSDB_CODE_VND_INVALID_VGROUP_ID
;
}
SVnodeObj
*
pVnode
=
*
ppVnode
;
vTrace
(
"vgId:%d, vnode will be dropped, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
vInfo
(
"vgId:%d, vnode will be dropped, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
pVnode
->
dropped
=
1
;
vnodeRelease
(
pVnode
);
vnodeCleanUp
(
pVnode
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -340,11 +352,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
}
int32_t
vnodeClose
(
int32_t
vgId
)
{
SVnodeObj
*
*
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
)
);
if
(
p
pVnode
==
NULL
||
*
pp
Vnode
==
NULL
)
return
0
;
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
return
0
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
v
Debug
(
"vgId:%d, vnode will be closed"
,
pVnode
->
vgId
);
vDebug
(
"vgId:%d, vnode will be closed, pVnode:%p"
,
pVnode
->
vgId
,
pVnode
)
;
v
nodeRelease
(
pVnode
);
vnodeCleanUp
(
pVnode
);
return
0
;
...
...
@@ -355,21 +367,27 @@ void vnodeRelease(void *pVnodeRaw) {
int32_t
vgId
=
pVnode
->
vgId
;
int32_t
refCount
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
vTrace
(
"vgId:%d, release vnode, refCount:%d pVnode:%p"
,
vgId
,
refCount
,
pVnode
);
assert
(
refCount
>=
0
);
if
(
refCount
>
0
)
{
vDebug
(
"vgId:%d, release vnode, refCount:%d"
,
vgId
,
refCount
);
if
(
pVnode
->
status
==
TAOS_VN_STATUS_RESET
&&
refCount
==
2
)
if
(
pVnode
->
status
==
TAOS_VN_STATUS_RESET
&&
refCount
==
2
)
{
tsem_post
(
&
pVnode
->
sem
);
}
return
;
}
qCleanupQueryMgmt
(
pVnode
->
qMgmt
);
pVnode
->
qMgmt
=
NULL
;
vDebug
(
"vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p"
,
vgId
,
refCount
,
pVnode
);
if
(
pVnode
->
qMgmt
)
{
qCleanupQueryMgmt
(
pVnode
->
qMgmt
);
pVnode
->
qMgmt
=
NULL
;
}
if
(
pVnode
->
tsdb
)
if
(
pVnode
->
tsdb
)
{
tsdbCloseRepo
(
pVnode
->
tsdb
,
1
);
pVnode
->
tsdb
=
NULL
;
pVnode
->
tsdb
=
NULL
;
}
// stop continuous query
if
(
pVnode
->
cq
)
{
...
...
@@ -378,18 +396,21 @@ void vnodeRelease(void *pVnodeRaw) {
cqClose
(
cq
);
}
if
(
pVnode
->
wal
)
if
(
pVnode
->
wal
)
{
walClose
(
pVnode
->
wal
);
pVnode
->
wal
=
NULL
;
pVnode
->
wal
=
NULL
;
}
if
(
pVnode
->
wqueue
)
if
(
pVnode
->
wqueue
)
{
dnodeFreeVnodeWqueue
(
pVnode
->
wqueue
);
pVnode
->
wqueue
=
NULL
;
pVnode
->
wqueue
=
NULL
;
}
if
(
pVnode
->
rqueue
)
if
(
pVnode
->
rqueue
)
{
dnodeFreeVnodeRqueue
(
pVnode
->
rqueue
);
pVnode
->
rqueue
=
NULL
;
pVnode
->
rqueue
=
NULL
;
}
taosTFree
(
pVnode
->
rootDir
);
if
(
pVnode
->
dropped
)
{
...
...
@@ -413,22 +434,31 @@ void vnodeRelease(void *pVnodeRaw) {
free
(
pVnode
);
int32_t
count
=
taosHashGetSize
(
tsDnodeVnodesHash
);
vDebug
(
"vgId:%d, vnode is released, vnodes:%d"
,
vgId
,
count
);
vDebug
(
"vgId:%d, vnode is destroyed, vnodes:%d"
,
vgId
,
count
);
}
static
void
vnodeIncRef
(
void
*
ptNode
)
{
assert
(
ptNode
!=
NULL
);
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
ptNode
;
assert
(
ppVnode
);
assert
(
*
ppVnode
);
SVnodeObj
*
pVnode
=
*
ppVnode
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
vTrace
(
"vgId:%d, get vnode, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
}
void
*
vnodeAcquire
(
int32_t
vgId
)
{
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
SVnodeObj
**
ppVnode
=
taosHashGetCB
(
tsDnodeVnodesHash
,
&
vgId
,
sizeof
(
int32_t
),
vnodeIncRef
,
NULL
,
sizeof
(
void
*
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
v
Info
(
"vgId:%d, not exist"
,
vgId
);
v
Debug
(
"vgId:%d, not exist"
,
vgId
);
return
NULL
;
}
SVnodeObj
*
pVnode
=
*
ppVnode
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
vDebug
(
"vgId:%d, get vnode, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
return
pVnode
;
return
*
ppVnode
;
}
void
*
vnodeAcquireRqueue
(
int32_t
vgId
)
{
...
...
@@ -528,7 +558,7 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
if
(
pVnode
!=
NULL
)
{
pVnode
->
accessState
=
pAccess
[
i
].
accessState
;
if
(
pVnode
->
accessState
!=
TSDB_VN_ALL_ACCCESS
)
{
vDebug
(
"vgId:%d, access state is set to %d"
,
pAccess
[
i
].
vgId
,
pVnode
->
accessState
)
vDebug
(
"vgId:%d, access state is set to %d"
,
pAccess
[
i
].
vgId
,
pVnode
->
accessState
)
;
}
vnodeRelease
(
pVnode
);
}
...
...
@@ -538,11 +568,12 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
)
{
// remove from hash, so new messages wont be consumed
taosHashRemove
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
int
i
=
0
;
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_INIT
)
{
// it may be in updateing or reset state, then it shall wait
while
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_CLOSING
)
!=
TAOS_VN_STATUS_READY
)
{
int
i
=
0
;
while
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_CLOSING
)
!=
TAOS_VN_STATUS_READY
)
{
if
(
++
i
%
1000
==
0
)
{
sched_yield
();
}
...
...
@@ -556,7 +587,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
syncStop
(
sync
);
}
v
Trace
(
"vgId:%d, vnode will cleanup, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
v
Debug
(
"vgId:%d, vnode will cleanup, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
// release local resources only after cutting off outside connections
qQueryMgmtNotifyClosed
(
pVnode
->
qMgmt
);
...
...
@@ -613,17 +644,19 @@ static int vnodeResetTsdb(SVnodeObj *pVnode)
char
rootDir
[
128
]
=
"
\0
"
;
sprintf
(
rootDir
,
"%s/tsdb"
,
pVnode
->
rootDir
);
if
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_RESET
)
!=
TAOS_VN_STATUS_READY
)
if
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_RESET
)
!=
TAOS_VN_STATUS_READY
)
{
return
-
1
;
}
void
*
tsdb
=
pVnode
->
tsdb
;
pVnode
->
tsdb
=
NULL
;
// acquire vnode
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
refCount
>
2
)
if
(
refCount
>
2
)
{
tsem_wait
(
&
pVnode
->
sem
);
}
// close tsdb, then open tsdb
tsdbCloseRepo
(
tsdb
,
0
);
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
320296a6
...
...
@@ -39,8 +39,7 @@ void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp
[
TSDB_MSG_TYPE_FETCH
]
=
vnodeProcessFetchMsg
;
}
int32_t
vnodeProcessRead
(
void
*
param
,
SReadMsg
*
pReadMsg
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param
;
static
int32_t
vnodeProcessReadImp
(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
)
{
int
msgType
=
pReadMsg
->
rpcMsg
.
msgType
;
if
(
vnodeProcessReadMsgFp
[
msgType
]
==
NULL
)
{
...
...
@@ -49,16 +48,23 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
}
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_READY
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, vnode status is %d"
,
pVnode
->
vgId
,
taosMsg
[
msgType
],
pVnode
->
status
);
vDebug
(
"vgId:%d, msgType:%s not processed, vnode status is %s"
,
pVnode
->
vgId
,
taosMsg
[
msgType
],
vnodeStatus
[
pVnode
->
status
]);
return
TSDB_CODE_APP_NOT_READY
;
}
// tsdb may be in reset state
if
(
pVnode
->
tsdb
==
NULL
)
return
TSDB_CODE_APP_NOT_READY
;
if
(
pVnode
->
status
==
TAOS_VN_STATUS_CLOSING
)
return
TSDB_CODE_APP_NOT_READY
;
if
(
pVnode
->
tsdb
==
NULL
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, tsdb is null"
,
pVnode
->
vgId
,
taosMsg
[
msgType
]);
return
TSDB_CODE_APP_NOT_READY
;
}
if
(
pVnode
->
status
==
TAOS_VN_STATUS_CLOSING
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, vstatus is %s"
,
pVnode
->
vgId
,
taosMsg
[
msgType
],
vnodeStatus
[
pVnode
->
status
]);
return
TSDB_CODE_APP_NOT_READY
;
}
// TODO: Later, let slave to support query
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
if
(
pVnode
->
role
!=
TAOS_SYNC_ROLE_SLAVE
&&
pVnode
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, replica:%d role:%s"
,
pVnode
->
vgId
,
taosMsg
[
msgType
],
pVnode
->
syncCfg
.
replica
,
syncRole
[
pVnode
->
role
]);
...
...
@@ -68,6 +74,25 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return
(
*
vnodeProcessReadMsgFp
[
msgType
])(
pVnode
,
pReadMsg
);
}
int32_t
vnodeProcessRead
(
void
*
param
,
SReadMsg
*
pRead
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param
;
int32_t
code
=
vnodeProcessReadImp
(
pVnode
,
pRead
);
if
(
code
==
TSDB_CODE_APP_NOT_READY
&&
pRead
->
rpcMsg
.
msgType
==
TSDB_MSG_TYPE_QUERY
)
{
// After the fetch request enters the vnode queue
// If the vnode cannot provide services, the following operations are still required
// Or, there will be a deadlock
void
**
qhandle
=
(
void
**
)
pRead
->
pCont
;
vError
(
"QInfo:%p msg:%p will be killed for vstatus is %s"
,
*
qhandle
,
pRead
,
vnodeStatus
[
pVnode
->
status
]);
// qKillQuery(*qhandle);
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
return
TSDB_CODE_APP_NOT_READY
;
}
else
{
return
code
;
}
}
static
void
vnodePutItemIntoReadQueue
(
SVnodeObj
*
pVnode
,
void
**
qhandle
,
void
*
ahandle
)
{
SReadMsg
*
pRead
=
(
SReadMsg
*
)
taosAllocateQitem
(
sizeof
(
SReadMsg
));
pRead
->
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_QUERY
;
...
...
@@ -175,11 +200,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if
(
code
==
TSDB_CODE_SUCCESS
)
{
handle
=
qRegisterQInfo
(
pVnode
->
qMgmt
,
(
uint64_t
)
pQInfo
);
if
(
handle
==
NULL
)
{
// failed to register qhandle, todo add error test case
if
(
handle
==
NULL
)
{
// failed to register qhandle
pRsp
->
code
=
terrno
;
terrno
=
0
;
vError
(
"vgId:%d QInfo:%p register qhandle failed, return to app, code:%s"
,
pVnode
->
vgId
,
(
void
*
)
pQInfo
,
tstrerror
(
pRsp
->
code
));
pRsp
->
code
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
qDestroyQueryInfo
(
pQInfo
);
// destroy it directly
return
pRsp
->
code
;
}
else
{
assert
(
*
handle
==
pQInfo
);
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
pQInfo
);
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
320296a6
...
...
@@ -202,7 +202,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
memcpy
(
pWal
,
pHead
,
size
);
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
v
Debug
(
"CQ: vgId:%d, get vnode wqueue, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
v
Trace
(
"CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
taosWriteQitem
(
pVnode
->
wqueue
,
type
,
pSync
);
...
...
@@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
memcpy
(
pWal
,
pHead
,
size
);
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
v
Debug
(
"vgId:%d, get vnode wqueue, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
v
Trace
(
"vgId:%d, get vnode wqueue, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
taosWriteQitem
(
pVnode
->
wqueue
,
type
,
pWal
);
...
...
tests/pytest/concurrent_inquiry.py
浏览文件 @
320296a6
...
...
@@ -12,108 +12,105 @@
# -*- coding: utf-8 -*-
import
threading
import
taos
import
sys
import
json
import
time
import
random
# query sql
query_sql
=
[
# first supertable
"select count(*) from test.meters where c1 > 50;"
,
"select count(*) from test.meters where c2 >= 50 and c2 < 100;"
,
"select count(*) from test.meters where c3 != 5;"
,
"select count(*) from test.meters ;"
,
"select count(*) from test.meters where t3 > 2;"
,
"select count(*) from test.meters where ts <> '2020-05-13 10:00:00.002';"
,
"select count(*) from test.meters where t7 like 'fi%';"
,
"select count(*) from test.meters where t7 like '_econd';"
,
"select count(*) from test.meters where t7 like 'taos_1%';"
,
"select count(*) from test.meters where t7 like '_____2';"
,
"select count(*) from test.meters where t8 like '%思%';"
,
"select count(*) from test.meters interval(1n) order by ts desc;"
,
"select max(c0) from test.meters group by tbname"
,
"select first(
*) from test.meters
;"
,
"select last(
*) from test.meters
;"
,
#
"select max(c0) from test.meters group by tbname",
"select first(
ts) from test.meters where t5 >5000 and t5<5100
;"
,
"select last(
ts) from test.meters where t5 >5000 and t5<5100
;"
,
"select last_row(*) from test.meters;"
,
"select twa(c1) from test.t1 where ts > 1500000001000 and ts < 1500000101000"
,
"select avg(c1) from test.meters;"
,
"select avg(c1) from test.meters
where t5 >5000 and t5<5100
;"
,
"select bottom(c1, 2) from test.t1;"
,
"select diff(c1) from test.t1;"
,
"select leastsquares(c1, 1, 1) from test.t1 ;"
,
"select max(c1) from test.meters;"
,
"select min(c1) from test.meters;"
,
"select c1 + c2
* c3
+ c1 / c5 + c4 + c2 from test.t1;"
,
"select max(c1) from test.meters
where t5 >5000 and t5<5100
;"
,
"select min(c1) from test.meters
where t5 >5000 and t5<5100
;"
,
"select c1 + c2 + c1 / c5 + c4 + c2 from test.t1;"
,
"select percentile(c1, 50) from test.t1;"
,
"select spread(c1) from test.t1 ;"
,
"select stddev(c1) from test.t1;"
,
"select sum(c1) from test.meters;"
,
"select top(c1, 2) from test.meters;"
"select twa(c
6
) from test.t1 where ts > 1500000001000 and ts < 1500000101000"
,
"select avg(c
6) from test.meters
;"
,
"select bottom(c
6, 2) from test.t1
;"
,
"select diff(c
6) from test.t1
;"
,
"select leastsquares(c
6
, 1, 1) from test.t1 ;"
,
"select max(c
6) from test.meters
;"
,
"select min(c
6) from test.meters
;"
,
"select c
6 + c2 * c3 + c6 / c5 + c4 + c2 from test.t1
;"
,
"select percentile(c
6
, 50) from test.t1;"
,
"select spread(c
6
) from test.t1 ;"
,
"select stddev(c
6) from test.t1
;"
,
"select sum(c
6) from test.meters
;"
,
"select top(c
6, 2) from test.meters
;"
,
"select sum(c1) from test.meters
where t5 >5000 and t5<5100
;"
,
"select top(c1, 2) from test.meters
where t5 >5000 and t5<5100
;"
"select twa(c
4
) from test.t1 where ts > 1500000001000 and ts < 1500000101000"
,
"select avg(c
4) from test.meters where t5 >5000 and t5<5100
;"
,
"select bottom(c
4, 2) from test.t1 where t5 >5000 and t5<5100
;"
,
"select diff(c
4) from test.t1 where t5 >5000 and t5<5100
;"
,
"select leastsquares(c
4
, 1, 1) from test.t1 ;"
,
"select max(c
4) from test.meters where t5 >5000 and t5<5100
;"
,
"select min(c
4) from test.meters where t5 >5000 and t5<5100
;"
,
"select c
5 + c2 + c4 / c5 + c4 + c2 from test.t1
;"
,
"select percentile(c
5
, 50) from test.t1;"
,
"select spread(c
5
) from test.t1 ;"
,
"select stddev(c
5) from test.t1 where t5 >5000 and t5<5100
;"
,
"select sum(c
5) from test.meters where t5 >5000 and t5<5100
;"
,
"select top(c
5, 2) from test.meters where t5 >5000 and t5<5100
;"
,
#all vnode
"select count(*) from test.meters where t5 >
2500 and t5<75
00"
,
"select max(c0),avg(c1) from test.meters where t5 >
2500 and t5<75
00"
,
"select sum(c5),avg(c1) from test.meters where t5 >
2500 and t5<75
00"
,
"select max(c0),min(c
6) from test.meters where t5 >2500 and t5<75
00"
,
"select min(c0),avg(c
6) from test.meters where t5 >2500 and t5<75
00"
,
"select count(*) from test.meters where t5 >
5000 and t5<51
00"
,
"select max(c0),avg(c1) from test.meters where t5 >
5000 and t5<51
00"
,
"select sum(c5),avg(c1) from test.meters where t5 >
5000 and t5<51
00"
,
"select max(c0),min(c
5) from test.meters where t5 >5000 and t5<51
00"
,
"select min(c0),avg(c
5) from test.meters where t5 >5000 and t5<51
00"
,
# second supertable
"select count(*) from test.meters1 where c1 > 50;"
,
"select count(*) from test.meters1 where c2 >= 50 and c2 < 100;"
,
"select count(*) from test.meters1 where c3 != 5;"
,
"select count(*) from test.meters1 where t3 > 2;"
,
"select count(*) from test.meters1 where ts <> '2020-05-13 10:00:00.002';"
,
"select count(*) from test.meters1 where t7 like 'fi%';"
,
"select count(*) from test.meters1 where t7 like '_econd';"
,
"select count(*) from test.meters where t7 like 'taos_1%';"
,
"select count(*) from test.meters where t7 like '_____2';"
,
"select count(*) from test.meters where t8 like '%思%';"
,
"select count(*) from test.meters1 interval(1n) order by ts desc;"
,
"select max(c0) from test.meters1 group by tbname"
,
"select first(
*) from test.meters1
;"
,
"select last(
*) from test.meters1
;"
,
"select last_row(*) from test.meters1;"
,
#
"select max(c0) from test.meters1 group by tbname",
"select first(
ts) from test.meters1 where t5 >5000 and t5<5100
;"
,
"select last(
ts) from test.meters1 where t5 >5000 and t5<5100
;"
,
"select last_row(*) from test.meters1
;"
,
"select twa(c1) from test.m1 where ts > 1500000001000 and ts < 1500000101000"
,
"select avg(c1) from test.meters1;"
,
"select bottom(c1, 2) from test.m1;"
,
"select diff(c1) from test.m1;"
,
"select avg(c1) from test.meters1
where t5 >5000 and t5<5100
;"
,
"select bottom(c1, 2) from test.m1
where t5 >5000 and t5<5100
;"
,
"select diff(c1) from test.m1
;"
,
"select leastsquares(c1, 1, 1) from test.m1 ;"
,
"select max(c1) from test.meters1;"
,
"select min(c1) from test.meters1;"
,
"select c1 + c2
* c3 + c1 / c5 + c3 + c2 from test.m1
;"
,
"select max(c1) from test.meters1
where t5 >5000 and t5<5100
;"
,
"select min(c1) from test.meters1
where t5 >5000 and t5<5100
;"
,
"select c1 + c2
+ c1 / c0 + c2 from test.m1
;"
,
"select percentile(c1, 50) from test.m1;"
,
"select spread(c1) from test.m1 ;"
,
"select stddev(c1) from test.m1;"
,
"select sum(c1) from test.meters1;"
,
"select top(c1, 2) from test.meters1;"
,
"select twa(c
6
) from test.m1 where ts > 1500000001000 and ts < 1500000101000"
,
"select avg(c
6) from test.meters1
;"
,
"select bottom(c
6
, 2) from test.m1;"
,
"select diff(c
6
) from test.m1;"
,
"select leastsquares(c
6
, 1, 1) from test.m1 ;"
,
"select max(c
6) from test.meters1
;"
,
"select min(c
6) from test.meters1
;"
,
"select c
6 + c2 * c3 + c6 / c5 + c3 + c2
from test.m1;"
,
"select percentile(c
6
, 50) from test.m1;"
,
"select spread(c
6
) from test.m1 ;"
,
"select stddev(c
6
) from test.m1;"
,
"select sum(c
6) from test.meters1
;"
,
"select top(c
6, 2) from test.meters1
;"
,
"select count(*) from test.meters1 where t5 >
2500 and t5<75
00"
,
"select sum(c1) from test.meters1
where t5 >5000 and t5<5100
;"
,
"select top(c1, 2) from test.meters1
where t5 >5000 and t5<5100
;"
,
"select twa(c
5
) from test.m1 where ts > 1500000001000 and ts < 1500000101000"
,
"select avg(c
5) from test.meters1 where t5 >5000 and t5<5100
;"
,
"select bottom(c
5
, 2) from test.m1;"
,
"select diff(c
5
) from test.m1;"
,
"select leastsquares(c
5
, 1, 1) from test.m1 ;"
,
"select max(c
5) from test.meters1 where t5 >5000 and t5<5100
;"
,
"select min(c
5) from test.meters1 where t5 >5000 and t5<5100
;"
,
"select c
5 + c2 + c4 / c5 + c0
from test.m1;"
,
"select percentile(c
4
, 50) from test.m1;"
,
"select spread(c
4
) from test.m1 ;"
,
"select stddev(c
4
) from test.m1;"
,
"select sum(c
4) from test.meters1 where t5 >5100 and t5<5300
;"
,
"select top(c
4, 2) from test.meters1 where t5 >5100 and t5<5300
;"
,
"select count(*) from test.meters1 where t5 >
5100 and t5<53
00"
,
#all vnode
"select count(*) from test.meters1 where t5 >
2500 and t5<75
00"
,
"select max(c0),avg(c1) from test.meters1 where t5 >
2500 and t5<75
00"
,
"select sum(c5),avg(c1) from test.meters1 where t5 >
2500 and t5<75
00"
,
"select max(c0),min(c
6) from test.meters1 where t5 >2500 and t5<75
00"
,
"select min(c0),avg(c
6) from test.meters1 where t5 >2500 and t5<75
00"
,
"select count(*) from test.meters1 where t5 >
5100 and t5<53
00"
,
"select max(c0),avg(c1) from test.meters1 where t5 >
5000 and t5<51
00"
,
"select sum(c5),avg(c1) from test.meters1 where t5 >
5000 and t5<51
00"
,
"select max(c0),min(c
5) from test.meters1 where t5 >5000 and t5<51
00"
,
"select min(c0),avg(c
5) from test.meters1 where t5 >5000 and t5<51
00"
,
#join
"select * from meters,meters1 where meters.ts = meters1.ts and meters.t5 = meters1.t5"
,
"select * from meters,meters1 where meters.ts = meters1.ts and meters.t7 = meters1.t7"
,
"select * from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8"
,
"select meters.ts,meters1.c2 from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8"
#
"select * from meters,meters1 where meters.ts = meters1.ts and meters.t5 = meters1.t5",
#
"select * from meters,meters1 where meters.ts = meters1.ts and meters.t7 = meters1.t7",
#
"select * from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8",
#
"select meters.ts,meters1.c2 from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8"
]
class
ConcurrentInquiry
:
...
...
@@ -121,7 +118,8 @@ class ConcurrentInquiry:
self
.
numOfTherads
=
50
self
.
ts
=
1500000001000
def
SetThreadsNum
(
self
,
num
):
self
.
numOfTherads
=
num
def
query_thread
(
self
,
threadID
):
host
=
"10.211.55.14"
user
=
"root"
...
...
@@ -142,12 +140,16 @@ class ConcurrentInquiry:
for
i
in
ran_query_sql
:
print
(
"Thread %d : %s"
%
(
threadID
,
i
))
try
:
start
=
time
.
time
()
cl
.
execute
(
i
)
cl
.
fetchall
end
=
time
.
time
()
print
(
"time cost :"
,
end
-
start
)
except
Exception
as
e
:
print
(
"Failure thread%d, sql: %s,exception: %s"
%
(
threadID
,
str
(
i
),
str
(
e
)))
exit
(
-
1
)
print
(
"Thread %d: finishing"
%
threadID
)
...
...
@@ -155,9 +157,9 @@ class ConcurrentInquiry:
def
run
(
self
):
threads
=
[]
for
i
in
range
(
50
):
for
i
in
range
(
self
.
numOfTherads
):
thread
=
threading
.
Thread
(
target
=
self
.
query_thread
,
args
=
(
i
,))
threads
.
append
(
thread
)
thread
.
start
()
...
...
tests/pytest/fulltest.sh
浏览文件 @
320296a6
...
...
@@ -149,6 +149,7 @@ python3 ./test.py -f query/queryNullValueTest.py
python3 ./test.py
-f
query/queryInsertValue.py
python3 ./test.py
-f
query/queryConnection.py
python3 ./test.py
-f
query/natualInterval.py
python3 ./test.py
-f
query/bug1471.py
#stream
python3 ./test.py
-f
stream/metric_1.py
...
...
tests/pytest/query/bug1471.py
0 → 100644
浏览文件 @
320296a6
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
import
time
import
threading
class
myThread
(
threading
.
Thread
):
def
__init__
(
self
,
conn
):
threading
.
Thread
.
__init__
(
self
)
self
.
event
=
threading
.
Event
()
self
.
conn
=
taos
.
connect
(
conn
.
_host
,
port
=
conn
.
_port
,
config
=
conn
.
_config
)
def
run
(
self
):
cur
=
self
.
conn
.
cursor
()
self
.
event
.
wait
()
cur
.
execute
(
"drop database db"
)
cur
.
close
()
self
.
conn
.
close
()
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
def
run
(
self
):
for
i
in
range
(
50
):
print
(
"round"
,
i
)
thread
=
myThread
(
tdSql
.
cursor
.
_connection
)
thread
.
start
()
tdSql
.
execute
(
'reset query cache'
)
tdSql
.
execute
(
'drop database if exists db'
)
tdSql
.
execute
(
'create database db'
)
tdSql
.
execute
(
'use db'
)
tdSql
.
execute
(
"create table car (ts timestamp, s int)"
)
tdSql
.
execute
(
"insert into car values('2020-10-19 17:00:00', 123)"
)
thread
.
event
.
set
()
try
:
tdSql
.
query
(
"select s from car where ts = '2020-10-19 17:00:00'"
)
except
Exception
as
e
:
pass
else
:
tdSql
.
checkData
(
0
,
0
,
123
)
thread
.
join
()
time
.
sleep
(
0.2
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录