Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
8b7ab163
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8b7ab163
编写于
7月 14, 2020
作者:
陶建辉(Jeff)
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into hotfix/wal
上级
bd8a9769
3302f60e
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
452 addition
and
487 deletion
+452
-487
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+46
-50
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+109
-106
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+2
-2
src/client/src/tscServer.c
src/client/src/tscServer.c
+5
-3
src/client/src/tscSql.c
src/client/src/tscSql.c
+5
-16
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+5
-3
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+7
-6
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+1
-1
src/plugins/http/src/httpContext.c
src/plugins/http/src/httpContext.c
+1
-1
src/plugins/http/src/httpSql.c
src/plugins/http/src/httpSql.c
+12
-6
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+1
-0
src/query/inc/tsqlfunction.h
src/query/inc/tsqlfunction.h
+7
-20
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+105
-119
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+10
-2
src/util/src/tcache.c
src/util/src/tcache.c
+82
-46
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+7
-2
tests/script/unique/arbitrator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim
...ator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim
+4
-3
tests/script/unique/cluster/client5.sim
tests/script/unique/cluster/client5.sim
+41
-101
tests/script/unique/cluster/cluster_main1.sim
tests/script/unique/cluster/cluster_main1.sim
+1
-0
tests/script/unique/cluster/cluster_main2.sim
tests/script/unique/cluster/cluster_main2.sim
+1
-0
未找到文件。
src/client/src/tscAsync.c
浏览文件 @
8b7ab163
...
...
@@ -45,6 +45,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql
->
pTscObj
=
pObj
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
pSql
->
fp
=
fp
;
pSql
->
fetchFp
=
fp
;
pSql
->
sqlstr
=
calloc
(
1
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
...
...
@@ -159,7 +160,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pRes
->
code
=
numOfRows
;
}
tscQueueAsync
Error
(
pSql
->
fetchFp
,
param
,
pRes
->
code
);
tscQueueAsync
Res
(
pSql
);
return
;
}
...
...
@@ -167,6 +168,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
if
(
pCmd
->
command
!=
TSDB_SQL_RETRIEVE_LOCALMERGE
&&
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
}
tscProcessSql
(
pSql
);
}
...
...
@@ -196,6 +198,10 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
// user-defined callback function is stored in fetchFp
pSql
->
fetchFp
=
fp
;
pSql
->
fp
=
tscAsyncFetchRowsProxy
;
if
(
pRes
->
qhandle
==
0
)
{
tscError
(
"qhandle is NULL"
);
pRes
->
code
=
TSDB_CODE_TSC_INVALID_QHANDLE
;
...
...
@@ -203,10 +209,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
return
;
}
// user-defined callback function is stored in fetchFp
pSql
->
fetchFp
=
fp
;
pSql
->
fp
=
tscAsyncFetchRowsProxy
;
pSql
->
param
=
param
;
tscResetForNextRetrieve
(
pRes
);
...
...
@@ -346,31 +348,32 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
void
tscProcessAsyncRes
(
SSchedMsg
*
pMsg
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
pMsg
->
ahandle
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
//
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes
*
pRes
=
&
pSql
->
res
;
void
*
taosres
=
pSql
;
//
void *taosres = pSql;
// pCmd may be released, so cache pCmd->command
int
cmd
=
pCmd
->
command
;
int
code
=
pRes
->
code
;
//
int cmd = pCmd->command;
//
int code = pRes->code;
// in case of async insert, restore the user specified callback function
bool
shouldFree
=
tscShouldBeFreed
(
pSql
);
if
(
cmd
==
TSDB_SQL_INSERT
)
{
assert
(
pSql
->
fp
!=
NULL
);
pSql
->
fp
=
pSql
->
fetchFp
;
}
if
(
pSql
->
fp
)
{
(
*
pSql
->
fp
)(
pSql
->
param
,
taosres
,
code
);
}
if
(
shouldFree
)
{
tscDebug
(
"%p sqlObj is automatically freed in async res"
,
pSql
);
tscFreeSqlObj
(
pSql
);
}
// bool shouldFree = tscShouldBeFreed(pSql);
// if (pCmd->command == TSDB_SQL_INSERT) {
// assert(pSql->fp != NULL);
assert
(
pSql
->
fp
!=
NULL
&&
pSql
->
fetchFp
!=
NULL
);
// }
// if (pSql->fp) {
pSql
->
fp
=
pSql
->
fetchFp
;
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
pRes
->
code
);
// }
// if (shouldFree) {
// tscDebug("%p sqlObj is automatically freed in async res", pSql);
// tscFreeSqlObj(pSql);
// }
}
static
void
tscProcessAsyncError
(
SSchedMsg
*
pMsg
)
{
...
...
@@ -420,15 +423,15 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
code
=
code
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRes
->
code
=
code
;
tscQueueAsyncRes
(
pSql
);
return
;
tscError
(
"%p ge tableMeta failed, code:%s"
,
pSql
,
tstrerror
(
code
));
goto
_error
;
}
else
{
tscDebug
(
"%p get tableMeta successfully"
,
pSql
);
}
tscDebug
(
"%p get tableMeta successfully"
,
pSql
);
if
(
pSql
->
pStream
==
NULL
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
...
...
@@ -453,11 +456,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert
(
pParObj
->
signature
==
pParObj
&&
trs
->
subqueryIndex
==
pTableMetaInfo
->
vgroupIndex
&&
pTableMetaInfo
->
vgroupIndex
>=
0
&&
pTableMetaInfo
->
vgroupList
!=
NULL
);
if
((
code
=
tscProcessSql
(
pSql
))
==
TSDB_CODE_SUCCESS
)
{
return
;
}
goto
_error
;
// tscProcessSql can add error into async res
tscProcessSql
(
pSql
);
return
;
}
else
{
// continue to process normal async query
if
(
pCmd
->
parseFinished
)
{
tscDebug
(
"%p update table meta in local cache, continue to process sql and send corresponding query"
,
pSql
);
...
...
@@ -481,26 +482,21 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
return
;
}
else
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql
->
fp
=
pSql
->
fetchFp
;
// restore the fp
if
((
code
=
tscHandleInsertRetry
(
pSql
))
==
TSDB_CODE_SUCCESS
)
{
return
;
}
}
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql
->
fp
=
pSql
->
fetchFp
;
// restore the fp
tscHandleInsertRetry
(
pSql
);
}
else
{
// in case of other query type, continue
if
((
code
=
tscProcessSql
(
pSql
))
==
TSDB_CODE_SUCCESS
)
{
return
;
}
tscProcessSql
(
pSql
);
}
goto
_error
;
return
;
}
else
{
tscDebug
(
"%p continue parse sql after get table meta"
,
pSql
);
...
...
@@ -538,7 +534,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto
_error
;
}
if
(
code
==
TSDB_CODE_SUCCESS
&&
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
code
=
tscGetSTableVgroupInfo
(
pSql
,
pCmd
->
clauseIndex
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
return
;
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
8b7ab163
此差异已折叠。
点击以展开。
src/client/src/tscSQLParser.c
浏览文件 @
8b7ab163
...
...
@@ -2471,7 +2471,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
startIdx
++
;
}
int32_t
factor
=
func
CompatDef
List
[
tscSqlExprGet
(
pQueryInfo
,
startIdx
)
->
functionId
];
int32_t
factor
=
func
tionCompat
List
[
tscSqlExprGet
(
pQueryInfo
,
startIdx
)
->
functionId
];
// diff function cannot be executed with other function
// arithmetic function can be executed with other arithmetic functions
...
...
@@ -2489,7 +2489,7 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo) {
continue
;
}
if
(
func
CompatDef
List
[
functionId
]
!=
factor
)
{
if
(
func
tionCompat
List
[
functionId
]
!=
factor
)
{
return
false
;
}
}
...
...
src/client/src/tscServer.c
浏览文件 @
8b7ab163
...
...
@@ -339,7 +339,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
}
if
(
rpcMsg
->
code
!=
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
rpcMsg
->
code
=
(
pRes
->
code
==
TSDB_CODE_SUCCESS
)
?
pRes
->
numOfRows
:
pRes
->
code
;
rpcMsg
->
code
=
(
pRes
->
code
==
TSDB_CODE_SUCCESS
)
?
pRes
->
numOfRows
:
pRes
->
code
;
bool
shouldFree
=
tscShouldBeFreed
(
pSql
);
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
rpcMsg
->
code
);
...
...
@@ -412,7 +412,7 @@ int tscProcessSql(SSqlObj *pSql) {
return
pSql
->
res
.
code
;
}
}
else
if
(
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
pSql
->
ipList
=
tscMgmtIpSet
;
//?
pSql
->
ipList
=
tscMgmtIpSet
;
}
else
{
// local handler
return
(
*
tscProcessMsgRsp
[
pCmd
->
command
])(
pSql
);
}
...
...
@@ -476,6 +476,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t
vgIndex
=
pTableMetaInfo
->
vgroupIndex
;
SVgroupsInfo
*
pVgroupInfo
=
pTableMetaInfo
->
vgroupList
;
assert
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
>
0
&&
vgIndex
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
);
}
else
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -549,6 +551,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
assert
(
index
>=
0
);
if
(
pTableMetaInfo
->
vgroupList
->
numOfVgroups
>
0
)
{
assert
(
index
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pVgroupInfo
=
&
pTableMetaInfo
->
vgroupList
->
vgroups
[
index
];
}
tscDebug
(
"%p query on stable, vgIndex:%d, numOfVgroups:%d"
,
pSql
,
index
,
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
...
...
@@ -1372,7 +1375,6 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pRes
->
code
=
TSDB_CODE_SUCCESS
;
if
(
pRes
->
rspType
==
0
)
{
pRes
->
numOfRows
=
numOfRes
;
pRes
->
row
=
0
;
...
...
src/client/src/tscSql.c
浏览文件 @
8b7ab163
...
...
@@ -481,25 +481,14 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
pRes
->
completed
==
false
&&
!
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
)
&&
(
pCmd
->
command
==
TSDB_SQL_SELECT
||
pCmd
->
command
==
TSDB_SQL_SHOW
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE
||
pCmd
->
command
==
TSDB_SQL_FETCH
)
&&
(
p
Cmd
->
command
==
TSDB_SQL_SELECT
&&
p
Sql
->
pStream
==
NULL
&&
pTableMetaInfo
->
pTableMeta
!=
NULL
))
{
pCmd
->
command
==
TSDB_SQL_SHOW
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE
||
pCmd
->
command
==
TSDB_SQL_FETCH
)
&&
(
pSql
->
pStream
==
NULL
&&
pTableMetaInfo
->
pTableMeta
!=
NULL
))
{
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
tscDebug
(
"%p send msg to dnode to free qhandle ASAP, command:%s"
,
pSql
,
sqlCmd
[
pCmd
->
command
]);
tscDebug
(
"%p send msg to dnode to free qhandle ASAP, command:%s
,
"
,
pSql
,
sqlCmd
[
pCmd
->
command
]);
tscProcessSql
(
pSql
);
// in case of sync model query, waits for response and then goes on
// if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) {
// sem_wait(&pSql->rspSem);
// tscFreeSqlObj(pSql);
// tscDebug("%p sqlObj is freed by app", pSql);
// } else {
tscDebug
(
"%p sqlObj will be freed while rsp received"
,
pSql
);
// }
return
true
;
}
...
...
src/client/src/tscSubquery.c
浏览文件 @
8b7ab163
...
...
@@ -1895,9 +1895,11 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
assert
(
pSupporter
->
index
<
pSupporter
->
pState
->
numOfTotal
);
STableDataBlocks
*
pTableDataBlock
=
taosArrayGetP
(
pCmd
->
pDataBlocks
,
pSupporter
->
index
);
pRes
->
code
=
tscCopyDataBlockToPayload
(
pSql
,
pTableDataBlock
);
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
return
pRes
->
code
;
int32_t
code
=
tscCopyDataBlockToPayload
(
pSql
,
pTableDataBlock
);
if
((
pRes
->
code
=
code
)
!=
TSDB_CODE_SUCCESS
)
{
tscQueueAsyncRes
(
pSql
);
return
code
;
// here the pSql may have been released already.
}
return
tscProcessSql
(
pSql
);
...
...
src/client/src/tscUtil.c
浏览文件 @
8b7ab163
...
...
@@ -1648,6 +1648,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
}
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
...
...
@@ -1803,6 +1804,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
pNew
->
fp
=
fp
;
pNew
->
fetchFp
=
fp
;
pNew
->
param
=
param
;
pNew
->
maxRetry
=
TSDB_MAX_REPLICA
;
...
...
@@ -2005,7 +2008,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
int32_t
totalVgroups
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
while
(
++
pTableMetaInfo
->
vgroupIndex
<
totalVgroups
)
{
if
(
++
pTableMetaInfo
->
vgroupIndex
<
totalVgroups
)
{
tscDebug
(
"%p results from vgroup index:%d completed, try next:%d. total vgroups:%d. current numOfRes:%"
PRId64
,
pSql
,
pTableMetaInfo
->
vgroupIndex
-
1
,
pTableMetaInfo
->
vgroupIndex
,
totalVgroups
,
pRes
->
numOfClauseTotal
);
...
...
@@ -2041,11 +2044,9 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
// set the callback function
pSql
->
fp
=
fp
;
int32_t
ret
=
tscProcessSql
(
pSql
);
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
return
;
}
else
{
// todo check for failure
}
tscProcessSql
(
pSql
);
}
else
{
tscDebug
(
"%p try all %d vnodes, query complete. current numOfRes:%"
PRId64
,
pSql
,
totalVgroups
,
pRes
->
numOfClauseTotal
);
}
}
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
8b7ab163
...
...
@@ -115,7 +115,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
uint64_t
expireTime
=
CONN_KEEP_TIME
*
1000
+
(
uint64_t
)
taosGetTimestampMs
();
SConnObj
*
pConn
=
taosCacheUpdateExpireTimeByName
(
tsMnodeConnCache
,
&
connId
,
sizeof
(
int32_t
),
expireTime
);
if
(
pConn
==
NULL
)
{
m
Error
(
"connId:%d, is already destroyed, user:%s ip:%s:%u"
,
connId
,
user
,
taosIpStr
(
ip
),
port
);
m
Debug
(
"connId:%d, is already destroyed, user:%s ip:%s:%u"
,
connId
,
user
,
taosIpStr
(
ip
),
port
);
return
NULL
;
}
...
...
src/plugins/http/src/httpContext.c
浏览文件 @
8b7ab163
...
...
@@ -137,7 +137,7 @@ void httpReleaseContext(HttpContext *pContext) {
assert
(
refCount
>=
0
);
HttpContext
**
ppContext
=
pContext
->
ppContext
;
httpDebug
(
"context:%p, is releasd, data:%p refCount:%d"
,
pContext
,
ppContext
,
refCount
);
httpDebug
(
"context:%p, is releas
e
d, data:%p refCount:%d"
,
pContext
,
ppContext
,
refCount
);
if
(
tsHttpServer
.
contextCache
!=
NULL
)
{
taosCacheRelease
(
tsHttpServer
.
contextCache
,
(
void
**
)(
&
ppContext
),
false
);
...
...
src/plugins/http/src/httpSql.c
浏览文件 @
8b7ab163
...
...
@@ -47,6 +47,10 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
}
}
if
(
tscResultsetFetchCompleted
(
result
))
{
isContinue
=
false
;
}
if
(
isContinue
)
{
// retrieve next batch of rows
httpDebug
(
"context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s"
,
...
...
@@ -75,7 +79,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext
*
pContext
=
(
HttpContext
*
)
param
;
if
(
pContext
==
NULL
)
return
;
HttpSqlCmds
*
multiCmds
=
pContext
->
multiCmds
;
code
=
taos_errno
(
result
);
HttpSqlCmds
*
multiCmds
=
pContext
->
multiCmds
;
HttpEncodeMethod
*
encode
=
pContext
->
encodeMethod
;
HttpSqlCmd
*
singleCmd
=
multiCmds
->
cmds
+
multiCmds
->
pos
;
...
...
@@ -109,8 +114,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
return
;
}
int
num_fields
=
taos_field_count
(
result
);
if
(
num_fields
==
0
)
{
bool
isUpdate
=
tscIsUpdateQuery
(
result
);
if
(
isUpdate
)
{
// not select or show commands
int
affectRows
=
taos_affected_rows
(
result
);
httpDebug
(
"context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s"
,
...
...
@@ -221,9 +226,9 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
if
(
numOfRows
<
0
)
{
httpError
(
"context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
tstrerror
(
numOfRows
));
}
taos_free_result
(
result
);
}
taos_free_result
(
result
);
if
(
encode
->
stopJsonFp
)
{
(
encode
->
stopJsonFp
)(
pContext
,
&
pContext
->
singleCmd
);
...
...
@@ -238,6 +243,7 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode)
if
(
pContext
==
NULL
)
return
;
int32_t
code
=
taos_errno
(
result
);
HttpEncodeMethod
*
encode
=
pContext
->
encodeMethod
;
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
...
...
src/query/inc/qExecutor.h
浏览文件 @
8b7ab163
...
...
@@ -121,6 +121,7 @@ typedef struct SQueryCostInfo {
uint32_t
loadBlockStatis
;
uint32_t
discardBlocks
;
uint64_t
elapsedTime
;
uint64_t
ioTime
;
uint64_t
computTime
;
}
SQueryCostInfo
;
...
...
src/query/inc/tsqlfunction.h
浏览文件 @
8b7ab163
...
...
@@ -125,7 +125,8 @@ typedef struct SArithmeticSupport {
}
SArithmeticSupport
;
typedef
struct
SQLPreAggVal
{
bool
isSet
;
bool
isSet
;
// statistics info set or not
bool
dataBlockLoaded
;
// data block is loaded or not
SDataStatis
statis
;
}
SQLPreAggVal
;
...
...
@@ -224,25 +225,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
#define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0)
/*
* the status of one block, used in metric query. all blocks are mixed together,
* we need the status to decide if one block is a first/end/inter block of one meter
*/
enum
{
BLK_FILE_BLOCK
=
0x1
,
BLK_BLOCK_LOADED
=
0x2
,
BLK_CACHE_BLOCK
=
0x4
,
// in case of cache block, block must be loaded
};
/* determine the real data need to calculated the result */
enum
{
BLK_DATA_NO_NEEDED
=
0x0
,
BLK_DATA_NO_NEEDED
=
0x0
,
BLK_DATA_STATIS_NEEDED
=
0x1
,
BLK_DATA_ALL_NEEDED
=
0x3
,
BLK_DATA_ALL_NEEDED
=
0x3
,
BLK_DATA_DISCARD
=
0x4
,
// discard current data block since it is not qualified for filter
};
#define SET_DATA_BLOCK_NOT_LOADED(x) ((x) &= (~BLK_BLOCK_LOADED));
typedef
struct
STwaInfo
{
TSKEY
lastKey
;
int8_t
hasResult
;
// flag to denote has value
...
...
@@ -264,12 +254,9 @@ typedef struct STwaInfo {
/* global sql function array */
extern
struct
SQLAggFuncElem
aAggs
[];
/* compatible check array list */
extern
int32_t
funcCompatDefList
[];
bool
top_bot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
char
*
minval
,
char
*
maxval
);
extern
int32_t
functionCompatList
[];
// compatible check array list
bool
stableQueryFunctChanged
(
int32_t
funcId
);
bool
topbot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
const
char
*
minval
,
const
char
*
maxval
);
void
resetResultInfo
(
SResultInfo
*
pResInfo
);
void
setResultInfoBuf
(
SResultInfo
*
pResInfo
,
int32_t
size
,
bool
superTable
,
char
*
buf
);
...
...
src/query/src/qExecutor.c
浏览文件 @
8b7ab163
...
...
@@ -1358,6 +1358,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx
->
preAggVals
.
isSet
=
false
;
}
pCtx
->
preAggVals
.
dataBlockLoaded
=
(
inputData
!=
NULL
);
// limit/offset query will affect this value
pCtx
->
startOffset
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pQuery
->
pos
:
0
;
pCtx
->
size
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pBlockInfo
->
rows
-
pQuery
->
pos
:
pQuery
->
pos
+
1
;
...
...
@@ -1928,73 +1930,46 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
/**
* decrease the refcount for each table involved in this query
* @param pQInfo
*/
UNUSED_FUNC
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
!=
NULL
)
{
// assert(taosHashGetSize(pQInfo->tableqinfoGroupInfo) >= 1);
}
#if 0
if (pQInfo == NULL || pQInfo->tableqinfoGroupInfo.numOfTables == 1) {
atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1);
qDebug("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode,
pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries);
} else {
int32_t num = 0;
for (int32_t i = 0; i < pQInfo->tableqinfoGroupInfo.numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->tableqinfoGroupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
qDebug("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
pMeter->meterId, pMeter->numOfQueries);
num++;
}
}
/*
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information
*/
num = pQInfo->tableqinfoGroupInfo.numOfTables - num;
qDebug("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo,
pQInfo->tableqinfoGroupInfo.numOfTables, num);
}
#endif
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
static
bool
needToLoadDataBlock
(
SQuery
*
pQuery
,
SDataStatis
*
pDataStatis
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfTotalPoints
)
{
if
(
pDataStatis
==
NULL
)
{
static
bool
needToLoadDataBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pDataStatis
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfRows
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pDataStatis
==
NULL
||
(
pQuery
->
numOfFilterCols
==
0
&&
(
!
pRuntimeEnv
->
topBotQuery
)))
{
return
true
;
}
#if 0
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfFilterCols
;
++
k
)
{
SSingleColumnFilterInfo
*
pFilterInfo
=
&
pQuery
->
pFilterInfo
[
k
];
int32_t colIndex = pFilterInfo->info.colIndex;
// this column not valid in current data block
if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) {
continue;
int32_t
index
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
if
(
pDataStatis
[
i
].
colId
==
pFilterInfo
->
info
.
colId
)
{
index
=
i
;
break
;
}
}
// no statistics data
if
(
index
==
-
1
)
{
return
true
;
}
// not support pre-filter operation on binary/nchar data type
if (!
vnodeSupportPrefilter(pFilterInfo->info.data
.type)) {
contin
ue;
if
(
!
IS_PREFILTER_TYPE
(
pFilterInfo
->
info
.
type
))
{
return
tr
ue
;
}
// all points in current column are NULL, no need to check its boundary value
if (pDataStatis[
colIndex].numOfNull == numOfTotalPoint
s) {
if
(
pDataStatis
[
index
].
numOfNull
==
numOfRow
s
)
{
continue
;
}
if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) {
float minval = *(double *)(&pDataStatis[colIndex].min);
float maxval = *(double *)(&pDataStatis[colIndex].max);
SDataStatis
*
pDataBlockst
=
&
pDataStatis
[
index
];
if
(
pFilterInfo
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
minval
=
*
(
double
*
)(
&
pDataBlockst
->
min
);
float
maxval
=
*
(
double
*
)(
&
pDataBlockst
->
max
);
for
(
int32_t
i
=
0
;
i
<
pFilterInfo
->
numOfFilters
;
++
i
)
{
if
(
pFilterInfo
->
pFilters
[
i
].
fp
(
&
pFilterInfo
->
pFilters
[
i
],
(
char
*
)
&
minval
,
(
char
*
)
&
maxval
))
{
...
...
@@ -2003,53 +1978,50 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
}
}
else
{
for
(
int32_t
i
=
0
;
i
<
pFilterInfo
->
numOfFilters
;
++
i
)
{
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&pDataStatis[colIndex].min,
(char *)&pDataStatis[colIndex].max)) {
if
(
pFilterInfo
->
pFilters
[
i
].
fp
(
&
pFilterInfo
->
pFilters
[
i
],
(
char
*
)
&
pDataBlockst
->
min
,
(
char
*
)
&
pDataBlockst
->
max
))
{
return
true
;
}
}
}
}
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].base.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
// }
// }
if
(
pRuntimeEnv
->
topBotQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
)
{
return
topbot_datablock_filter
(
&
pCtx
[
i
],
functionId
,
(
char
*
)
&
pDataStatis
[
i
].
min
,
(
char
*
)
&
pDataStatis
[
i
].
max
);
}
}
}
#endif
return
true
;
return
false
;
}
SArray
*
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pQueryHandle
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
)
{
int32_t
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pQueryHandle
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
,
SArray
**
pDataBlock
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
uint32_t
r
=
0
;
SArray
*
pDataBlock
=
NULL
;
uint32_t
status
=
0
;
if
(
pQuery
->
numOfFilterCols
>
0
)
{
r
=
BLK_DATA_ALL_NEEDED
;
}
else
{
// check if this data block is required to load
status
=
BLK_DATA_ALL_NEEDED
;
}
else
{
// check if this data block is required to load
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pSqlFunc
=
&
pQuery
->
pSelectExpr
[
i
].
base
;
int32_t
functionId
=
pSqlFunc
->
functionId
;
int32_t
colId
=
pSqlFunc
->
colInfo
.
colId
;
r
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
colId
);
status
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
colId
);
}
if
(
pRuntimeEnv
->
pTSBuf
>
0
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
r
|=
BLK_DATA_ALL_NEEDED
;
status
|=
BLK_DATA_ALL_NEEDED
;
}
}
if
(
r
==
BLK_DATA_NO_NEEDED
)
{
qDebug
(
"QInfo:%p data block discard, rows:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pBlockInfo
->
rows
);
if
(
status
==
BLK_DATA_NO_NEEDED
)
{
qDebug
(
"QInfo:%p data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pRuntimeEnv
->
summary
.
discardBlocks
+=
1
;
}
else
if
(
r
==
BLK_DATA_STATIS_NEEDED
)
{
}
else
if
(
status
==
BLK_DATA_STATIS_NEEDED
)
{
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
// return DISK_DATA_LOAD_FAILED;
}
...
...
@@ -2057,32 +2029,34 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
if
(
*
pStatis
==
NULL
)
{
// data block statistics does not exist, load data block
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
}
}
else
{
assert
(
r
==
BLK_DATA_ALL_NEEDED
);
assert
(
status
==
BLK_DATA_ALL_NEEDED
);
// load the data block statistics to perform further filter
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
}
if
(
!
needToLoadDataBlock
(
p
Query
,
*
pStatis
,
pRuntimeEnv
->
pCtx
,
pBlockInfo
->
rows
))
{
if
(
!
needToLoadDataBlock
(
p
RuntimeEnv
,
*
pStatis
,
pRuntimeEnv
->
pCtx
,
pBlockInfo
->
rows
))
{
#if defined(_DEBUG_VIEW)
qDebug
(
"QInfo:%p block discarded by per-filter"
,
GET_QINFO_ADDR
(
pRuntimeEnv
));
#endif
// current block has been discard due to filter applied
pRuntimeEnv
->
summary
.
discardBlocks
+=
1
;
// return DISK_DATA_DISCARDED;
qDebug
(
"QInfo:%p data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
return
BLK_DATA_DISCARD
;
}
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
pRuntimeEnv
->
summary
.
loadBlocks
+=
1
;
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
}
return
pDataBlock
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
...
...
@@ -2225,13 +2199,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
order
.
order
);
TsdbQueryHandleT
pQueryHandle
=
IS_MASTER_SCAN
(
pRuntimeEnv
)
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
summary
->
totalBlocks
+=
1
;
if
(
IS_QUERY_KILLED
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
finalizeQueryResult
(
pRuntimeEnv
);
// clean up allocated resource during query
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
...
...
@@ -2259,7 +2233,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
ensureOutputBuffer
(
pRuntimeEnv
,
&
blockInfo
);
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
);
SArray
*
pDataBlock
=
NULL
;
if
(
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
,
&
pDataBlock
)
==
BLK_DATA_DISCARD
)
{
pQuery
->
current
->
lastKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
blockInfo
.
window
.
ekey
+
step
:
blockInfo
.
window
.
skey
+
step
;
continue
;
}
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
blockInfo
.
rows
-
1
;
...
...
@@ -2282,8 +2260,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
IS_MASTER_SCAN
(
pRuntimeEnv
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
// int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
closeAllTimeWindow
(
&
pRuntimeEnv
->
windowResInfo
);
// removeRedundantWindow(&pRuntimeEnv->windowResInfo, pTableQueryInfo->lastKey - step, step);
pRuntimeEnv
->
windowResInfo
.
curIndex
=
pRuntimeEnv
->
windowResInfo
.
size
-
1
;
// point to the last time window
...
...
@@ -3393,12 +3369,10 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
cleanupTimeWindowInfo
(
&
pTableQueryInfo
->
windowResInfo
,
numOfCols
);
}
#define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \
do { \
SQuery *_query = (_runtime)->pQuery; \
_query->current = _tableInfo; \
assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_query)) || \
(((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_query))); \
#define CHECK_QUERY_TIME_RANGE(_q, _tableInfo) \
do { \
assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_q)) || \
(((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_q))); \
} while (0)
/**
...
...
@@ -3710,7 +3684,7 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void
stableApplyFunctionsOnBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
static
void
stableApplyFunctionsOnBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
SArray
*
pDataBlock
,
__block_search_fn_t
searchFn
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
...
...
@@ -3869,9 +3843,10 @@ static void queryCostStatis(SQInfo *pQInfo) {
// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
// pSummary->skippedFileBlocks, pSummary->totalGenData);
qDebug
(
"QInfo:%p :cost summary: elpased time:%"
PRId64
" us, total blocks:%d, use block statis:%d, use block data:%d, "
"total rows:%"
PRId64
", check rows:%"
PRId64
,
pQInfo
,
pSummary
->
elapsedTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
qDebug
(
"QInfo:%p :cost summary: elapsed time:%"
PRId64
" us, io time:%"
PRId64
" us, total blocks:%d, load block statis:%d,"
" load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
pQInfo
,
pSummary
->
elapsedTime
,
pSummary
->
ioTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
// qDebug("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
//
...
...
@@ -4253,6 +4228,23 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
static
FORCE_INLINE
void
setEnvForEachBlock
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableQueryInfo
,
SDataBlockInfo
*
pBlockInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
setExecutionContext
(
pQInfo
,
pTableQueryInfo
->
groupIndex
,
pBlockInfo
->
window
.
ekey
+
step
);
}
else
{
// interval query
TSKEY
nextKey
=
pBlockInfo
->
window
.
skey
;
setIntervalQueryRange
(
pQInfo
,
nextKey
);
if
(
pRuntimeEnv
->
hasTagResults
||
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
setAdditionalInfo
(
pQInfo
,
pTableQueryInfo
->
pTable
,
pTableQueryInfo
);
}
}
}
static
int64_t
scanMultiTableDataBlocks
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -4263,10 +4255,12 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
TsdbQueryHandleT
pQueryHandle
=
IS_MASTER_SCAN
(
pRuntimeEnv
)
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
summary
->
totalBlocks
+=
1
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
finalizeQueryResult
(
pRuntimeEnv
);
// clean up allocated resource during query
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
...
...
@@ -4276,24 +4270,19 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
break
;
}
assert
(
*
pTableQueryInfo
!=
NULL
);
SET_CURRENT_QUERY_TABLE_INFO
(
pRuntimeEnv
,
*
pTableQueryInfo
);
pQuery
->
current
=
*
pTableQueryInfo
;
CHECK_QUERY_TIME_RANGE
(
pQuery
,
*
pTableQueryInfo
);
if
(
!
pRuntimeEnv
->
groupbyNormalCol
)
{
setEnvForEachBlock
(
pQInfo
,
*
pTableQueryInfo
,
&
blockInfo
);
}
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
)
;
SArray
*
pDataBlock
=
NULL
;
if
(
!
pRuntimeEnv
->
groupbyNormalCol
)
{
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
int32_t
step
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
1
:-
1
;
setExecutionContext
(
pQInfo
,
(
*
pTableQueryInfo
)
->
groupIndex
,
blockInfo
.
window
.
ekey
+
step
);
}
else
{
// interval query
TSKEY
nextKey
=
blockInfo
.
window
.
skey
;
setIntervalQueryRange
(
pQInfo
,
nextKey
);
if
(
pRuntimeEnv
->
hasTagResults
||
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
setAdditionalInfo
(
pQInfo
,
(
*
pTableQueryInfo
)
->
pTable
,
*
pTableQueryInfo
);
}
}
if
(
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
,
&
pDataBlock
)
==
BLK_DATA_DISCARD
)
{
pQuery
->
current
->
lastKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
blockInfo
.
window
.
ekey
+
step
:
blockInfo
.
window
.
skey
+
step
;
continue
;
}
summary
->
totalRows
+=
blockInfo
.
rows
;
...
...
@@ -4553,7 +4542,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
while
(
pQInfo
->
tableIndex
<
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
)
{
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
finalizeQueryResult
(
pRuntimeEnv
);
// clean up allocated resource during query
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
...
...
@@ -5051,6 +5039,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
isFirstLastRowQuery
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
);
sequentialTableProcess
(
pQInfo
);
}
// record the total elapsed time
...
...
@@ -5382,7 +5371,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
qDebug
(
"qmsg:%p create arithmetic expr from binary string: %s"
,
pQueryMsg
,
pArithExprInfo
->
base
.
arg
[
0
].
argValue
.
pz
);
tExprNode
*
pExprNode
=
NULL
;
TRY
(
32
)
{
TRY
(
TSDB_MAX_TAGS
)
{
pExprNode
=
exprTreeFromBinary
(
pArithExprInfo
->
base
.
arg
[
0
].
argValue
.
pz
,
pArithExprInfo
->
base
.
arg
[
0
].
argBytes
);
}
CATCH
(
code
)
{
CLEANUP_EXECUTE
();
...
...
@@ -6176,11 +6165,6 @@ _over:
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
*
pQInfo
=
NULL
;
}
else
{
// SQInfo* pq = (SQInfo*) (*pQInfo);
// T_REF_INC(pq);
// T_REF_INC(pq);
}
// if failed to add ref for all meters in this query, abort current query
...
...
@@ -6349,6 +6333,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
sem_post
(
&
pQInfo
->
dataReady
);
setQueryKilled
(
pQInfo
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -6561,13 +6546,14 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
(
void
*
)
qInfo
);
return
NULL
;
}
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QInfo:%p failed to add qhandle into cache, since qMgmt is colsing"
,
(
void
*
)
qInfo
);
return
NULL
;
}
else
{
uint64_t
handleVal
=
(
uint64_t
)
qInfo
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
8b7ab163
...
...
@@ -1801,7 +1801,8 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
}
tsdbLoadCompData
(
&
pHandle
->
rhelper
,
pBlockInfo
->
compBlock
,
NULL
);
// todo opt perf
size_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pHandle
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SDataStatis
*
st
=
&
pHandle
->
statis
[
i
];
...
...
@@ -1820,6 +1821,13 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
if
(
pHandle
->
statis
[
i
].
numOfNull
==
-
1
)
{
// set the column data are all NULL
pHandle
->
statis
[
i
].
numOfNull
=
pBlockInfo
->
compBlock
->
numOfRows
;
}
// todo opt perf
SColumnInfo
*
pColInfo
=
taosArrayGet
(
pHandle
->
pColumns
,
i
);
if
(
pColInfo
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
pHandle
->
statis
[
i
].
min
=
pBlockInfo
->
compBlock
->
keyFirst
;
pHandle
->
statis
[
i
].
max
=
pBlockInfo
->
compBlock
->
keyLast
;
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2193,7 +2201,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
int32_t
ret
=
TSDB_CODE_SUCCESS
;
tExprNode
*
expr
=
NULL
;
TRY
(
32
)
{
TRY
(
TSDB_MAX_TAGS
)
{
expr
=
exprTreeFromTableName
(
tbnameCond
);
if
(
expr
==
NULL
)
{
expr
=
exprTreeFromBinary
(
pTagCond
,
len
);
...
...
src/util/src/tcache.c
浏览文件 @
8b7ab163
...
...
@@ -294,7 +294,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
}
}
else
{
// old data exists, update the node
pNode
=
taosUpdateCacheImpl
(
pCacheObj
,
pOld
,
key
,
keyLen
,
pData
,
dataSize
,
duration
*
1000L
);
uDebug
(
"cache:%s, key:%p, %p exist in cache, updated
"
,
pCacheObj
->
name
,
key
,
pNode
->
data
);
uDebug
(
"cache:%s, key:%p, %p exist in cache, updated
old:%p"
,
pCacheObj
->
name
,
key
,
pNode
->
data
,
pOld
);
}
__cache_unlock
(
pCacheObj
);
...
...
@@ -307,26 +307,30 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
return
NULL
;
}
void
*
pData
=
NULL
;
__cache_rd_lock
(
pCacheObj
);
SCacheDataNode
**
ptNode
=
(
SCacheDataNode
**
)
taosHashGet
(
pCacheObj
->
pHashTable
,
key
,
keyLen
);
int32_t
ref
=
0
;
if
(
ptNode
!=
NULL
)
{
ref
=
T_REF_INC
(
*
ptNode
);
pData
=
(
*
ptNode
)
->
data
;
}
__cache_unlock
(
pCacheObj
);
if
(
p
tNode
!=
NULL
)
{
if
(
p
Data
!=
NULL
)
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
hitCount
,
1
);
uDebug
(
"cache:%s, key:%p, %p is retrieved from cache, refcnt:%d"
,
pCacheObj
->
name
,
key
,
(
*
ptNode
)
->
d
ata
,
ref
);
uDebug
(
"cache:%s, key:%p, %p is retrieved from cache, refcnt:%d"
,
pCacheObj
->
name
,
key
,
pD
ata
,
ref
);
}
else
{
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
missCount
,
1
);
uDebug
(
"cache:%s, key:%p, not in cache, retrieved failed"
,
pCacheObj
->
name
,
key
);
}
atomic_add_fetch_32
(
&
pCacheObj
->
statistics
.
totalAccess
,
1
);
return
(
ptNode
!=
NULL
)
?
(
*
ptNode
)
->
data
:
NULL
;
return
pData
;
}
void
*
taosCacheUpdateExpireTimeByName
(
SCacheObj
*
pCacheObj
,
void
*
key
,
size_t
keyLen
,
uint64_t
expireTime
)
{
...
...
@@ -413,57 +417,89 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
*
data
=
NULL
;
// note: extend lifespan before dec ref count
if
(
pCacheObj
->
extendLifespan
)
{
bool
inTrashCan
=
pNode
->
inTrashCan
;
if
(
pCacheObj
->
extendLifespan
&&
(
!
inTrashCan
))
{
atomic_store_64
(
&
pNode
->
expireTime
,
pNode
->
lifespan
+
taosGetTimestampMs
());
uDebug
(
"cache:%s data:%p extend life time to %"
PRId64
" before release"
,
pCacheObj
->
name
,
pNode
->
data
,
pNode
->
expireTime
);
}
bool
inTrashCan
=
pNode
->
inTrashCan
;
uDebug
(
"cache:%s, key:%p, %p is released, refcnt:%d"
,
pCacheObj
->
name
,
pNode
->
key
,
pNode
->
data
,
T_REF_VAL_GET
(
pNode
)
-
1
);
if
(
_remove
)
{
__cache_wr_lock
(
pCacheObj
);
// NOTE: once refcount is decrease, pNode may be free by other thread immediately.
int32_t
ref
=
T_REF_DEC
(
pNode
);
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t
ref
=
T_REF_DEC
(
pNode
);
uDebug
(
"cache:%s, key:%p, %p is released, refcnt:%d"
,
pCacheObj
->
name
,
pNode
->
key
,
pNode
->
data
,
ref
);
if
(
inTrashCan
)
{
// Remove it if the ref count is 0.
// The ref count does not need to load and check again after lock acquired, since ref count can not be increased when
// the node is in trashcan.
if
(
ref
==
0
)
{
__cache_wr_lock
(
pCacheObj
);
assert
(
pNode
->
pTNodeHeader
->
pData
==
pNode
);
taosRemoveFromTrashCan
(
pCacheObj
,
pNode
->
pTNodeHeader
);
__cache_unlock
(
pCacheObj
);
/*
* If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
* releasing this resources.
*
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
* that tries to do the same thing.
*/
if
(
pNode
->
inTrashCan
)
{
if
(
ref
==
0
)
{
assert
(
pNode
->
pTNodeHeader
->
pData
==
pNode
);
taosRemoveFromTrashCan
(
pCacheObj
,
pNode
->
pTNodeHeader
);
}
}
else
{
if
(
ref
>
0
)
{
assert
(
pNode
->
pTNodeHeader
==
NULL
);
taosCacheMoveToTrash
(
pCacheObj
,
pNode
);
}
else
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
}
__cache_unlock
(
pCacheObj
);
}
else
{
assert
(
pNode
->
pTNodeHeader
==
NULL
);
if
(
_remove
)
{
// not in trash can, but need to remove it
__cache_wr_lock
(
pCacheObj
);
/*
* If not referenced by other users. Otherwise move this node to trashcan wait for all users
* releasing this resources.
*
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
* that tries to do the same thing.
*/
if
(
ref
==
0
)
{
if
(
T_REF_VAL_GET
(
pNode
)
==
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
else
{
taosCacheMoveToTrash
(
pCacheObj
,
pNode
);
}
}
uDebug
(
"cache:%s, key:%p, %p is released, refcnt:%d"
,
pCacheObj
->
name
,
pNode
->
key
,
pNode
->
data
,
T_REF_VAL_GET
(
pNode
)
-
1
);
__cache_unlock
(
pCacheObj
);
// } else { // extend its life time
// if (pCacheObj->extendLifespan) {
// atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
// uDebug("cache:%s data:%p extend life time to %"PRId64 " after release", pCacheObj->name, pNode->data, pNode->expireTime);
// }
__cache_wr_lock
(
pCacheObj
);
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t
ref
=
T_REF_DEC
(
pNode
);
if
(
inTrashCan
&&
(
ref
==
0
))
{
// Remove it if the ref count is 0.
// The ref count does not need to load and check again after lock acquired, since ref count can not be increased when
// the node is in trashcan.
assert
(
pNode
->
pTNodeHeader
->
pData
==
pNode
);
taosRemoveFromTrashCan
(
pCacheObj
,
pNode
->
pTNodeHeader
);
}
__cache_unlock
(
pCacheObj
);
}
// else {
// if (_remove) { // not in trash can, but need to remove it
// __cache_wr_lock(pCacheObj);
//
// /*
// * If not referenced by other users. Otherwise move this node to trashcan wait for all users
// * releasing this resources.
// *
// * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
// * that tries to do the same thing.
// */
// if (ref == 0) {
// if (T_REF_VAL_GET(pNode) == 0) {
// taosCacheReleaseNode(pCacheObj, pNode);
// } else {
// taosCacheMoveToTrash(pCacheObj, pNode);
// }
// } else if (ref > 0) {
// if (!pNode->inTrashCan) {
// assert(pNode->pTNodeHeader == NULL);
// taosCacheMoveToTrash(pCacheObj, pNode);
// }
// }
//
// __cache_unlock(pCacheObj);
// }
// }
}
void
taosCacheEmpty
(
SCacheObj
*
pCacheObj
)
{
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
8b7ab163
...
...
@@ -108,6 +108,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
code
==
TSDB_CODE_SUCCESS
)
{
handle
=
qRegisterQInfo
(
pVnode
->
qMgmt
,
(
uint64_t
)
pQInfo
);
if
(
handle
==
NULL
)
{
// failed to register qhandle
vError
(
"vgId:%d QInfo:%p register qhandle failed, return to app, code:%s"
,
pVnode
->
vgId
,
(
void
*
)
pQInfo
,
tstrerror
(
pRsp
->
code
));
pRsp
->
code
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
qDestroyQueryInfo
(
pQInfo
);
// destroy it directly
}
else
{
...
...
@@ -125,12 +127,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
}
else
{
assert
(
pQInfo
==
NULL
);
}
if
(
handle
!=
NULL
)
{
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app"
,
vgId
,
*
handle
);
dnodePutItemIntoReadQueue
(
pVnode
,
*
handle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
false
);
}
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
vgId
,
pQInfo
);
}
else
{
assert
(
pCont
!=
NULL
);
handle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
(
uint64_t
)
pCont
);
...
...
@@ -138,12 +142,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vWarn
(
"QInfo:%p invalid qhandle in continuing exec query, conn:%p"
,
(
void
*
)
pCont
,
pReadMsg
->
rpcMsg
.
handle
);
code
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
else
{
vDebug
(
"vgId:%d, QInfo:%p, dnode
query msg in progress
"
,
pVnode
->
vgId
,
(
void
*
)
pCont
);
vDebug
(
"vgId:%d, QInfo:%p, dnode
continue exec query
"
,
pVnode
->
vgId
,
(
void
*
)
pCont
);
code
=
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
qTableQuery
(
*
handle
);
// do execute query
}
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
false
);
}
return
code
;
}
...
...
tests/script/unique/arbitrator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim
浏览文件 @
8b7ab163
...
...
@@ -29,7 +29,7 @@ system sh/cfg.sh -n dnode2 -c alternativeRole -v 2
system sh/cfg.sh -n dnode3 -c alternativeRole -v 2
system sh/cfg.sh -n dnode4 -c alternativeRole -v 2
$totalTableNum =
1
0
$totalTableNum =
4
0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v $totalTableNum
system sh/cfg.sh -n dnode2 -c maxtablesPerVnode -v $totalTableNum
system sh/cfg.sh -n dnode3 -c maxtablesPerVnode -v $totalTableNum
...
...
@@ -65,9 +65,10 @@ sleep 3000
$totalTableNum = 20
$sleepTimer = 3000
$maxTables = $totalTableNum * 2
$db = db
print create database $db replica 3 maxTables $
totalTableNum
sql create database $db replica 3 maxTables $
totalTableNum
print create database $db replica 3 maxTables $
maxTables
sql create database $db replica 3 maxTables $
maxTables
sql use $db
# create table , insert data
...
...
tests/script/unique/cluster/client5.sim
浏览文件 @
8b7ab163
$tblStart = 0
$tblEnd = 2000
$tsStart = 1325347200000 # 2012-01-01 00:00:00.000
$tsEnd = 1325347210000
###############################################################
sql connect
$db = db1
$stb = stb1
#subtable: tb0 - tb4999
#print create table $stb (ts timestamp, c1 int) tags(t1 int, t2 binary(16))
sleep 20000 # wait other client insert data
loop_lable:
print ====================== client5 start loop query
$db = db2
$stb = stb2
print create database if not exists $db replica 2
sql create database if not exists $db replica 2
sql use $db
print ==== client4start create table
$i = $tblStart
while $i < $tblEnd
$tb = dtb . $i
sql create table $tb (ts timestamp, c1 int)
$i = $i + 1
endw
print ==== client4start insert, include multi table data in one insert sql
$totalRows = 0
$totalRowsPerTbl = 0
$rowsPerLoop = 100
$ts = $tsStart
$i = $tblStart
while $i < $tblEnd
$tb0 = dtb . $i
$i = $i + 1
$tb1 = dtb . $i
$i = $i + 1
$tb2 = dtb . $i
$i = $i + 1
$tb3 = dtb . $i
$i = $i + 1
$tb4 = dtb . $i
$i = $i + 1
$x = 0
while $x < $rowsPerLoop
sql insert into $tb0 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb1 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb2 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb3 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb4 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x )
$x = $x + 20
$ts = $ts + 40a
endw
$totalRowsPerTbl = $totalRowsPerTbl + $x
$x = $x * 5
$totalRows = $totalRows + $x
endw
sql select count(*) from tb10
if $data00 != $totalRowsPerTbl then
print data00 $data00 totalRowsPerTbl $totalRowsPerTbl
print ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
print ************ client4 insert loss: $deltaRows *****
print ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
endi
print ====client4 start alter table
$i = $tblStart
while $i < $tblEnd
$tb = dtb . $i
sql alter table $tb add c2 float
$i = $i + 1
endw
print ====client4 continue insert, include multi table data in one insert sql
$i = $tblStart
while $i < $tblEnd
$tb0 = dtb . $i
$i = $i + 1
$tb1 = dtb . $i
$i = $i + 1
$tb2 = dtb . $i
$i = $i + 1
$tb3 = dtb . $i
$i = $i + 1
$tb4 = dtb . $i
$i = $i + 1
$x = 0
while $x < $rowsPerLoop
sql insert into $tb0 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb1 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb2 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb3 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x ) $tb4 values ( $ts + 0a , $x ) ( $ts + 2a , $x ) ( $ts + 4a , $x ) ( $ts + 6a , $x ) ( $ts + 8a , $x ) ( $ts + 10a , $x ) ( $ts + 12a , $x ) ( $ts + 14a , $x ) ( $ts + 16a , $x ) ( $ts + 18a , $x ) ( $ts + 20a , $x ) ( $ts + 22a , $x ) ( $ts + 24a , $x ) ( $ts + 26a , $x ) ( $ts + 28a , $x ) ( $ts + 30a , $x ) ( $ts + 32a , $x ) ( $ts + 34a , $x ) ( $ts + 36a , $x ) ( $ts + 38a , $x )
$x = $x + 20
$ts = $ts + 40a
endw
$totalRowsPerTbl = $totalRowsPerTbl + $x
$x = $x * 5
$totalRows = $totalRows + $x
endw
sql select count(*) from tb10
if $data00 != $totalRowsPerTbl then
print data00 $data00 totalRowsPerTbl $totalRowsPerTbl
print ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
print ************ client4 insert loss: $deltaRows *****
print ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
endi
sql use $db
sql select count(*) from $stb
$tsQueryStart = $tsStart
$tsQueryStart = $tsStart + 90000
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart order by ts asc limit 500 offset 7
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart order by ts desc limit 500 offset 7
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client1_0'
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client1_1'
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client1_2'
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client1_3'
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client2_0'
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client2_1'
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client2_2'
sql select * from $stb where ts > $tsQueryStart and ts < $tsQueryStart and t2 == 'client2_3'
sql select min(c1) from $stb
sql select max(c1) from $stb
sql select first(*) from $stb
sql select last(*) from $stb
sql select last_row(*) from $stb
sql select sum(c1) from $stb
sql select avg(c1) from $stb
sql select min(c1) from tb1
sql select max(c1) from tb10
sql select first(*) from tb100
sql select last(*) from tb1000
sql select last_row(*) from tb20
sql select sum(c1) from tb200
sql select avg(c1) from tb2000
print ====================== client4 drop database
sql drop if exists database $db
goto loop_lable
\ No newline at end of file
tests/script/unique/cluster/cluster_main1.sim
浏览文件 @
8b7ab163
...
...
@@ -80,6 +80,7 @@ run_back unique/cluster/main1_client1_2.sim
run_back unique/cluster/main1_client1_3.sim
run_back unique/cluster/client3.sim
run_back unique/cluster/client4.sim
run_back unique/cluster/client5.sim
sleep 20000
...
...
tests/script/unique/cluster/cluster_main2.sim
浏览文件 @
8b7ab163
...
...
@@ -84,6 +84,7 @@ run_back unique/cluster/main2_client2_2.sim
run_back unique/cluster/main2_client2_3.sim
run_back unique/cluster/client3.sim
run_back unique/cluster/client4.sim
run_back unique/cluster/client5.sim
sleep 20000
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录