Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
6c009cd0
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6c009cd0
编写于
10月 26, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
10月 26, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #3995 from taosdata/feature/query
Feature/query
上级
42967159
57f15cfc
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
570 addition
and
181 deletion
+570
-181
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+4
-1
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+3
-3
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+2
-1
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+22
-10
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+202
-63
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+61
-14
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+5
-2
src/common/src/tglobal.c
src/common/src/tglobal.c
+24
-8
src/common/src/tvariant.c
src/common/src/tvariant.c
+14
-13
src/inc/query.h
src/inc/query.h
+0
-1
src/inc/taoserror.h
src/inc/taoserror.h
+1
-0
src/query/inc/qTsbuf.h
src/query/inc/qTsbuf.h
+9
-10
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+156
-15
src/query/src/qTsbuf.c
src/query/src/qTsbuf.c
+51
-27
src/query/tests/tsBufTest.cpp
src/query/tests/tsBufTest.cpp
+7
-9
src/util/inc/tconfig.h
src/util/inc/tconfig.h
+1
-1
src/util/src/tcache.c
src/util/src/tcache.c
+7
-2
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
6c009cd0
...
...
@@ -82,6 +82,7 @@ typedef struct SJoinSupporter {
char
*
pIdTagList
;
// result of first stage tags
int32_t
totalLen
;
int32_t
num
;
SArray
*
pVgroupTables
;
}
SJoinSupporter
;
typedef
struct
SVgroupTableInfo
{
...
...
@@ -215,7 +216,7 @@ SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex);
void
tscClearTableMetaInfo
(
STableMetaInfo
*
pTableMetaInfo
,
bool
removeFromCache
);
STableMetaInfo
*
tscAddTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
name
,
STableMeta
*
pTableMeta
,
SVgroupsInfo
*
vgroupList
,
SArray
*
pTagCols
);
SVgroupsInfo
*
vgroupList
,
SArray
*
pTagCols
,
SArray
*
pVgroupTables
);
STableMetaInfo
*
tscAddEmptyMetaInfo
(
SQueryInfo
*
pQueryInfo
);
int32_t
tscAddSubqueryInfo
(
SSqlCmd
*
pCmd
);
...
...
@@ -224,6 +225,8 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void
tscClearSubqueryInfo
(
SSqlCmd
*
pCmd
);
void
tscFreeVgroupTableInfo
(
SArray
*
pVgroupTables
);
SArray
*
tscCloneVgroupTableInfo
(
SArray
*
pVgroupTables
);
void
tscRemoveVgroupTableGroup
(
SArray
*
pVgroupTable
,
int32_t
index
);
int
tscGetSTableVgroupInfo
(
SSqlObj
*
pSql
,
int32_t
clauseIndex
);
int
tscGetTableMeta
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
);
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
6c009cd0
...
...
@@ -4025,11 +4025,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
// primary ts must be existed, so no need to check its existance
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
tsBufAppend
(
pTSbuf
,
0
,
&
pCtx
->
tag
,
input
,
pCtx
->
size
*
TSDB_KEYSIZE
);
tsBufAppend
(
pTSbuf
,
(
int32_t
)
pCtx
->
param
[
0
].
i64Key
,
&
pCtx
->
tag
,
input
,
pCtx
->
size
*
TSDB_KEYSIZE
);
}
else
{
for
(
int32_t
i
=
pCtx
->
size
-
1
;
i
>=
0
;
--
i
)
{
char
*
d
=
GET_INPUT_CHAR_INDEX
(
pCtx
,
i
);
tsBufAppend
(
pTSbuf
,
0
,
&
pCtx
->
tag
,
d
,
TSDB_KEYSIZE
);
tsBufAppend
(
pTSbuf
,
(
int32_t
)
pCtx
->
param
[
0
].
i64Key
,
&
pCtx
->
tag
,
d
,
(
int32_t
)
TSDB_KEYSIZE
);
}
}
...
...
@@ -4048,7 +4048,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) {
STSBuf
*
pTSbuf
=
pInfo
->
pTSBuf
;
tsBufAppend
(
pTSbuf
,
0
,
&
pCtx
->
tag
,
pData
,
TSDB_KEYSIZE
);
tsBufAppend
(
pTSbuf
,
(
int32_t
)
pCtx
->
param
[
0
].
i64Key
,
&
pCtx
->
tag
,
pData
,
TSDB_KEYSIZE
);
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
6c009cd0
...
...
@@ -698,7 +698,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pg
*=
2
;
}
size_t
numOfSubs
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
size_t
numOfSubs
=
pSql
->
subState
.
numOfSub
;
assert
(
numOfSubs
<=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
for
(
int32_t
i
=
0
;
i
<
numOfSubs
;
++
i
)
{
(
*
pMemBuffer
)[
i
]
=
createExtMemBuffer
(
nBufferSizes
,
rlen
,
pg
,
pModel
);
(
*
pMemBuffer
)[
i
]
->
flushModel
=
MULTIPLE_APPEND_MODEL
;
...
...
src/client/src/tscSchemaUtil.c
浏览文件 @
6c009cd0
...
...
@@ -177,7 +177,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pVgroupInfo
->
epAddr
[
i
].
port
=
pEpMsg
->
port
;
}
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
&
pTableMeta
->
v
groupInfo
);
tscInitCorVgroupInfo
(
&
pTableMeta
->
corVgroupInfo
,
pV
groupInfo
);
pTableMeta
->
sversion
=
pTableMetaMsg
->
sversion
;
pTableMeta
->
tversion
=
pTableMetaMsg
->
tversion
;
...
...
src/client/src/tscServer.c
浏览文件 @
6c009cd0
...
...
@@ -481,14 +481,25 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
int32_t
vgIndex
=
pTableMetaInfo
->
vgroupIndex
;
SVgroupsInfo
*
pVgroupInfo
=
pTableMetaInfo
->
vgroupList
;
assert
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
>
0
&&
vgIndex
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
if
(
pTableMetaInfo
->
pVgroupTables
==
NULL
)
{
SVgroupsInfo
*
pVgroupInfo
=
pTableMetaInfo
->
vgroupList
;
assert
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
>
0
&&
vgIndex
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
);
tscDebug
(
"%p build fetch msg from vgId:%d, vgIndex:%d"
,
pSql
,
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
,
vgIndex
);
}
else
{
int32_t
numOfVgroups
=
(
int32_t
)
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);
assert
(
vgIndex
>=
0
&&
vgIndex
<
numOfVgroups
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pVgroupInfo
->
vgroups
[
vgIndex
].
vgId
);
SVgroupTableInfo
*
pTableIdList
=
taosArrayGet
(
pTableMetaInfo
->
pVgroupTables
,
vgIndex
);
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableIdList
->
vgInfo
.
vgId
);
tscDebug
(
"%p build fetch msg from vgId:%d, vgIndex:%d"
,
pSql
,
pTableIdList
->
vgInfo
.
vgId
,
vgIndex
);
}
}
else
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
pRetrieveMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgroupInfo
.
vgId
);
tscDebug
(
"%p build fetch msg from only one vgroup, vgId:%d"
,
pSql
,
pTableMeta
->
vgroupInfo
.
vgId
);
}
pSql
->
cmd
.
payloadLen
=
sizeof
(
SRetrieveTableMsg
);
...
...
@@ -662,12 +673,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
limit
=
htobe64
(
pQueryInfo
->
limit
.
limit
);
pQueryMsg
->
offset
=
htobe64
(
pQueryInfo
->
limit
.
offset
);
pQueryMsg
->
numOfCols
=
htons
((
int16_t
)
taosArrayGetSize
(
pQueryInfo
->
colList
));
pQueryMsg
->
interval
.
interval
=
htobe64
(
pQueryInfo
->
interval
.
interval
);
pQueryMsg
->
interval
.
sliding
=
htobe64
(
pQueryInfo
->
interval
.
sliding
);
pQueryMsg
->
interval
.
interval
=
htobe64
(
pQueryInfo
->
interval
.
interval
);
pQueryMsg
->
interval
.
sliding
=
htobe64
(
pQueryInfo
->
interval
.
sliding
);
pQueryMsg
->
interval
.
offset
=
htobe64
(
pQueryInfo
->
interval
.
offset
);
pQueryMsg
->
interval
.
intervalUnit
=
pQueryInfo
->
interval
.
intervalUnit
;
pQueryMsg
->
interval
.
slidingUnit
=
pQueryInfo
->
interval
.
slidingUnit
;
pQueryMsg
->
interval
.
offsetUnit
=
pQueryInfo
->
interval
.
offsetUnit
;
pQueryMsg
->
interval
.
slidingUnit
=
pQueryInfo
->
interval
.
slidingUnit
;
pQueryMsg
->
interval
.
offsetUnit
=
pQueryInfo
->
interval
.
offsetUnit
;
pQueryMsg
->
numOfGroupCols
=
htons
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
);
pQueryMsg
->
numOfTags
=
htonl
(
numOfTags
);
pQueryMsg
->
tagNameRelType
=
htons
(
pQueryInfo
->
tagCond
.
relType
);
...
...
@@ -850,7 +861,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t
numOfBlocks
=
0
;
if
(
pQueryInfo
->
tsBuf
!=
NULL
)
{
STSVnodeBlockInfo
*
pBlockInfo
=
tsBufGetVnodeBlockInfo
(
pQueryInfo
->
tsBuf
,
pTableMetaInfo
->
vgroupIndex
);
int32_t
vnodeId
=
htonl
(
pQueryMsg
->
head
.
vgId
);
STSVnodeBlockInfo
*
pBlockInfo
=
tsBufGetVnodeBlockInfo
(
pQueryInfo
->
tsBuf
,
vnodeId
);
assert
(
QUERY_IS_JOIN_QUERY
(
pQueryInfo
->
type
)
&&
pBlockInfo
!=
NULL
);
// this query should not be sent
// todo refactor
...
...
@@ -2271,7 +2283,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
STableMetaInfo
*
pMInfo
=
tscGetMetaInfo
(
pQueryInfo
,
i
);
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pMInfo
->
pTableMeta
);
tscAddTableMetaInfo
(
pNewQueryInfo
,
pMInfo
->
name
,
pTableMeta
,
NULL
,
pMInfo
->
tagColList
);
tscAddTableMetaInfo
(
pNewQueryInfo
,
pMInfo
->
name
,
pTableMeta
,
NULL
,
pMInfo
->
tagColList
,
pMInfo
->
pVgroupTables
);
}
if
((
code
=
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
))
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/client/src/tscSubquery.c
浏览文件 @
6c009cd0
...
...
@@ -23,7 +23,6 @@
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tscSubquery.h"
typedef
struct
SInsertSupporter
{
SSqlObj
*
pSql
;
...
...
@@ -59,6 +58,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
pSubQueryInfo1
->
tsBuf
=
output1
;
pSubQueryInfo2
->
tsBuf
=
output2
;
TSKEY
st
=
taosGetTimestampUs
();
// no result generated, return directly
if
(
pSupporter1
->
pTSBuf
==
NULL
||
pSupporter2
->
pTSBuf
==
NULL
)
{
tscDebug
(
"%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting"
,
pSql
);
...
...
@@ -95,7 +96,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tscInfo
(
"%"
PRId64
", tags:%"
PRId64
"
\t
%"
PRId64
", tags:%"
PRId64
,
elem1
.
ts
,
elem1
.
tag
.
i64Key
,
elem2
.
ts
,
elem2
.
tag
.
i64Key
);
#endif
int32_t
res
=
tVariantCompare
(
&
elem1
.
tag
,
&
elem2
.
tag
);
int32_t
res
=
tVariantCompare
(
elem1
.
tag
,
elem2
.
tag
);
if
(
res
==
-
1
||
(
res
==
0
&&
tsCompare
(
order
,
elem1
.
ts
,
elem2
.
ts
)))
{
if
(
!
tsBufNextPos
(
pSupporter1
->
pTSBuf
))
{
break
;
...
...
@@ -122,8 +123,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
win
->
ekey
=
elem1
.
ts
;
}
tsBufAppend
(
output1
,
elem1
.
vnode
,
&
elem1
.
tag
,
(
const
char
*
)
&
elem1
.
ts
,
sizeof
(
elem1
.
ts
));
tsBufAppend
(
output2
,
elem2
.
vnode
,
&
elem2
.
tag
,
(
const
char
*
)
&
elem2
.
ts
,
sizeof
(
elem2
.
ts
));
tsBufAppend
(
output1
,
elem1
.
vnode
,
elem1
.
tag
,
(
const
char
*
)
&
elem1
.
ts
,
sizeof
(
elem1
.
ts
));
tsBufAppend
(
output2
,
elem2
.
vnode
,
elem2
.
tag
,
(
const
char
*
)
&
elem2
.
ts
,
sizeof
(
elem2
.
ts
));
}
else
{
pLimit
->
offset
-=
1
;
}
...
...
@@ -158,9 +160,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufDestroy
(
pSupporter1
->
pTSBuf
);
tsBufDestroy
(
pSupporter2
->
pTSBuf
);
tscDebug
(
"%p input1:%"
PRId64
", input2:%"
PRId64
", final:%"
PRId64
" for secondary query after ts blocks "
"intersecting, skey:%"
PRId64
", ekey:%"
PRId64
,
pSql
,
numOfInput1
,
numOfInput2
,
output1
->
numOfTotal
,
win
->
skey
,
win
->
ekey
);
TSKEY
et
=
taosGetTimestampUs
();
tscDebug
(
"%p input1:%"
PRId64
", input2:%"
PRId64
", final:%"
PRId64
" in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%"
PRId64
", ekey:%"
PRId64
", numOfVnode:%d, elasped time:%"
PRId64
" us"
,
pSql
,
numOfInput1
,
numOfInput2
,
output1
->
numOfTotal
,
output1
->
numOfVnodes
,
win
->
skey
,
win
->
ekey
,
tsBufGetNumOfVnodes
(
output1
),
et
-
st
);
return
output1
->
numOfTotal
;
}
...
...
@@ -305,7 +308,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
);
memcpy
(
&
pQueryInfo
->
interval
,
&
pSupporter
->
interval
,
sizeof
(
pQueryInfo
->
interval
));
tscTagCondCopy
(
&
pQueryInfo
->
tagCond
,
&
pSupporter
->
tagCond
);
...
...
@@ -324,7 +326,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscFieldInfoUpdateOffset
(
pNewQueryInfo
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pNewQueryInfo
,
0
);
pTableMetaInfo
->
pVgroupTables
=
pSupporter
->
pVgroupTables
;
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection.
...
...
@@ -360,6 +363,54 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pExpr
->
numOfParams
=
1
;
}
int32_t
num
=
0
;
int32_t
*
list
=
NULL
;
tsBufGetVnodeIdList
(
pNewQueryInfo
->
tsBuf
,
&
num
,
&
list
);
if
(
pTableMetaInfo
->
pVgroupTables
!=
NULL
)
{
for
(
int32_t
k
=
0
;
k
<
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);)
{
SVgroupTableInfo
*
p
=
taosArrayGet
(
pTableMetaInfo
->
pVgroupTables
,
k
);
bool
found
=
false
;
for
(
int32_t
f
=
0
;
f
<
num
;
++
f
)
{
if
(
p
->
vgInfo
.
vgId
==
list
[
f
])
{
found
=
true
;
break
;
}
}
if
(
!
found
)
{
tscRemoveVgroupTableGroup
(
pTableMetaInfo
->
pVgroupTables
,
k
);
}
else
{
k
++
;
}
}
assert
(
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
)
>
0
);
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_MULTITABLE_QUERY
);
}
else
{
// TODO remove unnecessarily accessed vnode
// pTableMetaInfo->vgroupList->
// for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) {
// SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k);
//
// bool found = false;
// for(int32_t f = 0; f < num; ++f) {
// if (p->vgInfo.vgId == list[f]) {
// found = true;
// break;
// }
// }
//
// if (!found) {
// tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k);
// } else {
// k++;
// }
// }
}
taosTFree
(
list
);
size_t
numOfCols
=
taosArrayGetSize
(
pNewQueryInfo
->
colList
);
tscDebug
(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%"
PRIzu
", colList:%"
PRIzu
", fieldsInfo:%d, name:%s"
,
pSql
,
pNew
,
0
,
pTableMetaInfo
->
vgroupIndex
,
pNewQueryInfo
->
type
,
taosArrayGetSize
(
pNewQueryInfo
->
exprList
),
...
...
@@ -418,6 +469,8 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
static
void
updateQueryTimeRange
(
SQueryInfo
*
pQueryInfo
,
STimeWindow
*
win
)
{
assert
(
pQueryInfo
->
window
.
skey
<=
win
->
skey
&&
pQueryInfo
->
window
.
ekey
>=
win
->
ekey
);
pQueryInfo
->
window
=
*
win
;
}
int32_t
tscCompareTidTags
(
const
void
*
p1
,
const
void
*
p2
)
{
...
...
@@ -474,10 +527,11 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
tscClearSubqueryInfo
(
pCmd
);
tscFreeSqlResult
(
pSql
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
tscInitQueryInfo
(
pQueryInfo
);
TSDB_QUERY_CLEAR_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_TAG_FILTER_QUERY
);
...
...
@@ -524,13 +578,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscProcessSql
(
pSql
);
}
static
bool
checkForDuplicateTagVal
(
SQueryInfo
*
pQueryInfo
,
SJoinSupporter
*
p1
,
SSqlObj
*
pPSqlObj
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
SSchema
*
pSchema
=
tscGetTableTagSchema
(
pTableMetaInfo
->
pTableMeta
);
// todo: tags mismatch, tags not completed
SColumn
*
pCol
=
taosArrayGetP
(
pTableMetaInfo
->
tagColList
,
0
);
SSchema
*
pColSchema
=
&
pSchema
[
pCol
->
colIndex
.
columnIndex
];
static
bool
checkForDuplicateTagVal
(
SSchema
*
pColSchema
,
SJoinSupporter
*
p1
,
SSqlObj
*
pPSqlObj
)
{
for
(
int32_t
i
=
1
;
i
<
p1
->
num
;
++
i
)
{
STidTags
*
prev
=
(
STidTags
*
)
varDataVal
(
p1
->
pIdTagList
+
(
i
-
1
)
*
p1
->
tagSize
);
STidTags
*
p
=
(
STidTags
*
)
varDataVal
(
p1
->
pIdTagList
+
i
*
p1
->
tagSize
);
...
...
@@ -564,7 +612,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
*
s1
=
taosArrayInit
(
p1
->
num
,
p1
->
tagSize
-
sizeof
(
int16_t
));
*
s2
=
taosArrayInit
(
p2
->
num
,
p2
->
tagSize
-
sizeof
(
int16_t
));
if
(
!
(
checkForDuplicateTagVal
(
p
QueryInfo
,
p1
,
pParentSql
)
&&
checkForDuplicateTagVal
(
pQueryInfo
,
p2
,
pParentSql
)))
{
if
(
!
(
checkForDuplicateTagVal
(
p
ColSchema
,
p1
,
pParentSql
)
&&
checkForDuplicateTagVal
(
pColSchema
,
p2
,
pParentSql
)))
{
return
TSDB_CODE_QRY_DUP_JOIN_KEY
;
}
...
...
@@ -708,6 +756,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
STableMetaInfo
*
pTableMetaInfo2
=
tscGetMetaInfo
(
pQueryInfo2
,
0
);
tscBuildVgroupTableInfo
(
pParentSql
,
pTableMetaInfo2
,
s2
);
SSqlObj
*
psub1
=
pParentSql
->
pSubs
[
0
];
((
SJoinSupporter
*
)
psub1
->
param
)
->
pVgroupTables
=
tscCloneVgroupTableInfo
(
pTableMetaInfo1
->
pVgroupTables
);
SSqlObj
*
psub2
=
pParentSql
->
pSubs
[
1
];
((
SJoinSupporter
*
)
psub2
->
param
)
->
pVgroupTables
=
tscCloneVgroupTableInfo
(
pTableMetaInfo2
->
pVgroupTables
);
pParentSql
->
subState
.
numOfSub
=
2
;
pParentSql
->
subState
.
numOfRemain
=
pParentSql
->
subState
.
numOfSub
;
...
...
@@ -766,9 +820,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pSupporter
->
pTSBuf
=
pBuf
;
}
else
{
assert
(
pQueryInfo
->
numOfTables
==
1
);
// for subquery, only one
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
tsBufMerge
(
pSupporter
->
pTSBuf
,
pBuf
,
pTableMetaInfo
->
vgroupIndex
);
tsBufMerge
(
pSupporter
->
pTSBuf
,
pBuf
);
tsBufDestroy
(
pBuf
);
}
...
...
@@ -835,6 +887,8 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo
*
pPQueryInfo
=
tscGetQueryInfoDetail
(
&
pParentSql
->
cmd
,
pParentSql
->
cmd
.
clauseIndex
);
updateQueryTimeRange
(
pPQueryInfo
,
&
win
);
//update the vgroup that involved in real data query
tscLaunchRealSubqueries
(
pParentSql
);
}
...
...
@@ -868,20 +922,27 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
assert
(
pQueryInfo
->
numOfTables
==
1
);
// for projection query, need to try next vnode if current vnode is exhausted
if
((
++
pTableMetaInfo
->
vgroupIndex
)
<
pTableMetaInfo
->
vgroupList
->
numOfVgroups
)
{
pState
->
numOfRemain
=
1
;
pState
->
numOfSub
=
1
;
int32_t
numOfVgroups
=
0
;
// TODO refactor
if
(
pTableMetaInfo
->
pVgroupTables
!=
NULL
)
{
numOfVgroups
=
(
int32_t
)
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);
}
else
{
numOfVgroups
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
}
if
((
++
pTableMetaInfo
->
vgroupIndex
)
<
numOfVgroups
)
{
tscDebug
(
"%p no result in current vnode anymore, try next vnode, vgIndex:%d"
,
pSql
,
pTableMetaInfo
->
vgroupIndex
);
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pSql
->
fp
=
tscJoinQueryCallback
;
tscProcessSql
(
pSql
);
tscProcessSql
(
pSql
);
return
;
}
else
{
tscDebug
(
"%p no result in current subquery anymore"
,
pSql
);
}
}
if
(
atomic_sub_fetch_32
(
&
p
ParentSql
->
subState
.
numOfRemain
,
1
)
>
0
)
{
tscDebug
(
"%p sub:%p completed, remain:%d, total:%d"
,
pParentSql
,
tres
,
p
ParentSql
->
subState
.
numOfRemain
,
pState
->
numOfSub
);
if
(
atomic_sub_fetch_32
(
&
p
State
->
numOfRemain
,
1
)
>
0
)
{
tscDebug
(
"%p sub:%p completed, remain:%d, total:%d"
,
pParentSql
,
tres
,
p
State
->
numOfRemain
,
pState
->
numOfSub
);
return
;
}
...
...
@@ -895,60 +956,60 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// update the records for each subquery in parent sql object.
for
(
int32_t
i
=
0
;
i
<
pState
->
numOfSub
;
++
i
)
{
if
(
pParentSql
->
pSubs
[
i
]
==
NULL
)
{
tscDebug
(
"%p %p sub:%d not retrieve data"
,
pParentSql
,
NULL
,
i
);
continue
;
}
SSqlRes
*
pRes1
=
&
pParentSql
->
pSubs
[
i
]
->
res
;
pRes1
->
numOfClauseTotal
+=
pRes1
->
numOfRows
;
}
// data has retrieved to client, build the join results
tscBuildResFromSubqueries
(
pParentSql
);
}
static
SJoinSupporter
*
tscUpdateSubqueryStatus
(
SSqlObj
*
pSql
,
int32_t
numOfFetch
)
{
int32_t
notInvolved
=
0
;
SJoinSupporter
*
pSupporter
=
NULL
;
SSubqueryState
*
pState
=
&
pSql
->
subState
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
if
(
pSql
->
pSubs
[
i
]
==
NULL
)
{
notInvolved
++
;
if
(
pRes1
->
row
>
0
&&
pRes1
->
numOfRows
>
0
)
{
tscDebug
(
"%p sub:%p index:%d numOfRows:%"
PRId64
" total:%"
PRId64
" (not retrieve)"
,
pParentSql
,
pParentSql
->
pSubs
[
i
],
i
,
pRes1
->
numOfRows
,
pRes1
->
numOfTotal
);
assert
(
pRes1
->
row
<
pRes1
->
numOfRows
);
}
else
{
pSupporter
=
(
SJoinSupporter
*
)
pSql
->
pSubs
[
i
]
->
param
;
pRes1
->
numOfClauseTotal
+=
pRes1
->
numOfRows
;
tscDebug
(
"%p sub:%p index:%d numOfRows:%"
PRId64
" total:%"
PRId64
,
pParentSql
,
pParentSql
->
pSubs
[
i
],
i
,
pRes1
->
numOfRows
,
pRes1
->
numOfTotal
);
}
}
pState
->
numOfRemain
=
numOfFetch
;
return
pSupporter
;
// data has retrieved to client, build the join results
tscBuildResFromSubqueries
(
pParentSql
)
;
}
void
tscFetchDatablockFromSubquery
(
SSqlObj
*
pSql
)
{
assert
(
pSql
->
subState
.
numOfSub
>=
1
);
int32_t
numOfFetch
=
0
;
bool
hasData
=
true
;
bool
hasData
=
true
;
bool
reachLimit
=
false
;
// if the subquery is NULL, it does not involved in the final result generation
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
// if the subquery is NULL, it does not involved in the final result generation
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
==
NULL
)
{
continue
;
}
SSqlRes
*
pRes
=
&
pSub
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSub
->
cmd
,
0
);
if
(
!
tscHasReachLimitation
(
pQueryInfo
,
pRes
))
{
if
(
pRes
->
row
>=
pRes
->
numOfRows
)
{
// no data left in current result buffer
hasData
=
false
;
// The current query is completed for the active vnode, try next vnode if exists
// If it is completed, no need to fetch anymore.
if
(
!
pRes
->
completed
)
{
numOfFetch
++
;
}
}
}
else
{
// has reach the limitation, no data anymore
if
(
pRes
->
row
>=
pRes
->
numOfRows
)
{
hasData
=
false
;
reachLimit
=
true
;
hasData
=
false
;
break
;
}
}
...
...
@@ -958,10 +1019,67 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if
(
hasData
)
{
tscBuildResFromSubqueries
(
pSql
);
return
;
}
else
if
(
numOfFetch
<=
0
)
{
}
// If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
// super table projection query.
if
(
numOfFetch
<=
0
&&
!
reachLimit
)
{
bool
tryNextVnode
=
false
;
SSqlObj
*
pp
=
pSql
->
pSubs
[
0
];
SQueryInfo
*
pi
=
tscGetQueryInfoDetail
(
&
pp
->
cmd
,
0
);
// get the number of subquery that need to retrieve the next vnode.
if
(
tscNonOrderedProjectionQueryOnSTable
(
pi
,
0
))
{
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
!=
NULL
&&
pSub
->
res
.
row
>=
pSub
->
res
.
numOfRows
&&
pSub
->
res
.
completed
)
{
pSql
->
subState
.
numOfRemain
++
;
}
}
}
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
==
NULL
)
{
continue
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSub
->
cmd
,
0
);
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
pSub
->
res
.
row
>=
pSub
->
res
.
numOfRows
&&
pSub
->
res
.
completed
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
// for projection query, need to try next vnode if current vnode is exhausted
int32_t
numOfVgroups
=
0
;
// TODO refactor
if
(
pTableMetaInfo
->
pVgroupTables
!=
NULL
)
{
numOfVgroups
=
(
int32_t
)
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);
}
else
{
numOfVgroups
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
}
if
((
++
pTableMetaInfo
->
vgroupIndex
)
<
numOfVgroups
)
{
tscDebug
(
"%p no result in current vnode anymore, try next vnode, vgIndex:%d"
,
pSub
,
pTableMetaInfo
->
vgroupIndex
);
pSub
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pSub
->
fp
=
tscJoinQueryCallback
;
tscProcessSql
(
pSub
);
tryNextVnode
=
true
;
}
else
{
tscDebug
(
"%p no result in current subquery anymore"
,
pSub
);
}
}
}
if
(
tryNextVnode
)
{
return
;
}
pSql
->
res
.
completed
=
true
;
freeJoinSubqueryObj
(
pSql
);
if
(
pSql
->
res
.
code
==
TSDB_CODE_SUCCESS
)
{
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
0
);
}
else
{
...
...
@@ -972,15 +1090,17 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
}
// TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
// retrieve data from current vnode.
tscDebug
(
"%p retrieve data from %d subqueries"
,
pSql
,
numOfFetch
);
SJoinSupporter
*
pSupporter
=
tscUpdateSubqueryStatus
(
pSql
,
numOfFetch
);
SJoinSupporter
*
pSupporter
=
NULL
;
pSql
->
subState
.
numOfRemain
=
numOfFetch
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pSql1
=
pSql
->
pSubs
[
i
];
if
(
pSql1
==
NULL
)
{
continue
;
}
SSqlRes
*
pRes1
=
&
pSql1
->
res
;
SSqlCmd
*
pCmd1
=
&
pSql1
->
cmd
;
...
...
@@ -1122,7 +1242,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* data instead of returning to its invoker
*/
if
(
pTableMetaInfo
->
vgroupIndex
>
0
&&
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
pParentSql
->
subState
.
numOfRemain
=
pParentSql
->
subState
.
numOfSub
;
// reset the record value
//
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value
pSql
->
fp
=
joinRetrieveFinalResCallback
;
// continue retrieve data
pSql
->
cmd
.
command
=
TSDB_SQL_FETCH
;
...
...
@@ -1386,7 +1506,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
SSubqueryState
*
pState
=
&
pSql
->
subState
;
pState
->
numOfSub
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
pState
->
numOfSub
=
0
;
if
(
pTableMetaInfo
->
pVgroupTables
==
NULL
)
{
pState
->
numOfSub
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
}
else
{
pState
->
numOfSub
=
(
int32_t
)
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);
}
assert
(
pState
->
numOfSub
>
0
);
int32_t
ret
=
tscLocalReducerEnvCreate
(
pSql
,
&
pMemoryBuf
,
&
pDesc
,
&
pModel
,
nBufferSize
);
...
...
@@ -2017,7 +2143,7 @@ static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t column
assert
(
pInfo
->
pSqlExpr
!=
NULL
);
*
bytes
=
pInfo
->
pSqlExpr
->
resBytes
;
char
*
pData
=
pRes
->
data
+
pInfo
->
pSqlExpr
->
offset
*
pRes
->
numOfRows
;
char
*
pData
=
pRes
->
data
+
pInfo
->
pSqlExpr
->
offset
*
pRes
->
numOfRows
+
pRes
->
row
*
(
*
bytes
)
;
return
pData
;
}
...
...
@@ -2029,11 +2155,13 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
int32_t
numOfRes
=
INT32_MAX
;
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
if
(
pSql
->
pSubs
[
i
]
==
NULL
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
==
NULL
)
{
continue
;
}
numOfRes
=
(
int32_t
)(
MIN
(
numOfRes
,
pSql
->
pSubs
[
i
]
->
res
.
numOfRows
));
int32_t
remain
=
(
int32_t
)(
pSub
->
res
.
numOfRows
-
pSub
->
res
.
row
);
numOfRes
=
(
int32_t
)(
MIN
(
numOfRes
,
remain
));
}
if
(
numOfRes
==
0
)
{
...
...
@@ -2059,14 +2187,23 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SColumnIndex
*
pIndex
=
&
pRes
->
pColumnIndex
[
i
];
SSqlRes
*
pRes1
=
&
pSql
->
pSubs
[
pIndex
->
tableIndex
]
->
res
;
SSqlCmd
*
pCmd1
=
&
pSql
->
pSubs
[
pIndex
->
tableIndex
]
->
cmd
;
SSqlRes
*
pRes1
=
&
pSql
->
pSubs
[
pIndex
->
tableIndex
]
->
res
;
SSqlCmd
*
pCmd1
=
&
pSql
->
pSubs
[
pIndex
->
tableIndex
]
->
cmd
;
char
*
pData
=
getResultBlockPosition
(
pCmd1
,
pRes1
,
pIndex
->
columnIndex
,
&
bytes
);
memcpy
(
data
,
pData
,
bytes
*
numOfRes
);
data
+=
bytes
*
numOfRes
;
pRes1
->
row
=
numOfRes
;
}
for
(
int32_t
i
=
0
;
i
<
pSql
->
subState
.
numOfSub
;
++
i
)
{
SSqlObj
*
pSub
=
pSql
->
pSubs
[
i
];
if
(
pSub
==
NULL
)
{
continue
;
}
pSub
->
res
.
row
+=
numOfRes
;
assert
(
pSub
->
res
.
row
<=
pSub
->
res
.
numOfRows
);
}
pRes
->
numOfRows
=
numOfRes
;
...
...
@@ -2085,6 +2222,8 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
pRes
->
numOfCols
=
(
int32_t
)
numOfExprs
;
pRes
->
tsrow
=
calloc
(
numOfExprs
,
POINTER_BYTES
);
pRes
->
buffer
=
calloc
(
numOfExprs
,
POINTER_BYTES
);
pRes
->
length
=
calloc
(
numOfExprs
,
sizeof
(
int32_t
));
...
...
src/client/src/tscUtil.c
浏览文件 @
6c009cd0
...
...
@@ -1678,19 +1678,62 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
}
void
tscFreeVgroupTableInfo
(
SArray
*
pVgroupTables
)
{
if
(
pVgroupTables
!=
NULL
)
{
size_t
num
=
taosArrayGetSize
(
pVgroupTables
);
for
(
size_t
i
=
0
;
i
<
num
;
i
++
)
{
SVgroupTableInfo
*
pInfo
=
taosArrayGet
(
pVgroupTables
,
i
);
if
(
pVgroupTables
==
NULL
)
{
return
;
}
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgInfo
.
numOfEps
;
++
j
)
{
taosTFree
(
pInfo
->
vgInfo
.
epAddr
[
j
].
fqdn
);
}
size_t
num
=
taosArrayGetSize
(
pVgroupTables
);
for
(
size_t
i
=
0
;
i
<
num
;
i
++
)
{
SVgroupTableInfo
*
pInfo
=
taosArrayGet
(
pVgroupTables
,
i
);
taosArrayDestroy
(
pInfo
->
itemList
);
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgInfo
.
numOfEps
;
++
j
)
{
taosTFree
(
pInfo
->
vgInfo
.
epAddr
[
j
].
fqdn
);
}
taosArrayDestroy
(
pVgroupTables
);
taosArrayDestroy
(
pInfo
->
itemList
);
}
taosArrayDestroy
(
pVgroupTables
);
}
void
tscRemoveVgroupTableGroup
(
SArray
*
pVgroupTable
,
int32_t
index
)
{
assert
(
pVgroupTable
!=
NULL
&&
index
>=
0
);
size_t
size
=
taosArrayGetSize
(
pVgroupTable
);
assert
(
size
>
index
);
SVgroupTableInfo
*
pInfo
=
taosArrayGet
(
pVgroupTable
,
index
);
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgInfo
.
numOfEps
;
++
j
)
{
taosTFree
(
pInfo
->
vgInfo
.
epAddr
[
j
].
fqdn
);
}
taosArrayDestroy
(
pInfo
->
itemList
);
taosArrayRemove
(
pVgroupTable
,
index
);
}
SArray
*
tscCloneVgroupTableInfo
(
SArray
*
pVgroupTables
)
{
if
(
pVgroupTables
==
NULL
)
{
return
NULL
;
}
size_t
num
=
taosArrayGetSize
(
pVgroupTables
);
SArray
*
pa
=
taosArrayInit
(
num
,
sizeof
(
SVgroupTableInfo
));
SVgroupTableInfo
info
;
for
(
size_t
i
=
0
;
i
<
num
;
i
++
)
{
SVgroupTableInfo
*
pInfo
=
taosArrayGet
(
pVgroupTables
,
i
);
memset
(
&
info
,
0
,
sizeof
(
SVgroupTableInfo
));
info
.
vgInfo
=
pInfo
->
vgInfo
;
for
(
int32_t
j
=
0
;
j
<
pInfo
->
vgInfo
.
numOfEps
;
++
j
)
{
info
.
vgInfo
.
epAddr
[
j
].
fqdn
=
strdup
(
pInfo
->
vgInfo
.
epAddr
[
j
].
fqdn
);
}
info
.
itemList
=
taosArrayClone
(
pInfo
->
itemList
);
taosArrayPush
(
pa
,
&
info
);
}
return
pa
;
}
void
clearAllTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
)
{
...
...
@@ -1708,7 +1751,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem
}
STableMetaInfo
*
tscAddTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
name
,
STableMeta
*
pTableMeta
,
SVgroupsInfo
*
vgroupList
,
SArray
*
pTagCols
)
{
SVgroupsInfo
*
vgroupList
,
SArray
*
pTagCols
,
SArray
*
pVgroupTables
)
{
void
*
pAlloc
=
realloc
(
pQueryInfo
->
pTableMetaInfo
,
(
pQueryInfo
->
numOfTables
+
1
)
*
POINTER_BYTES
);
if
(
pAlloc
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -1742,13 +1785,15 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
if
(
pTagCols
!=
NULL
)
{
tscColumnListCopy
(
pTableMetaInfo
->
tagColList
,
pTagCols
,
-
1
);
}
pTableMetaInfo
->
pVgroupTables
=
tscCloneVgroupTableInfo
(
pVgroupTables
);
pQueryInfo
->
numOfTables
+=
1
;
return
pTableMetaInfo
;
}
STableMetaInfo
*
tscAddEmptyMetaInfo
(
SQueryInfo
*
pQueryInfo
)
{
return
tscAddTableMetaInfo
(
pQueryInfo
,
NULL
,
NULL
,
NULL
,
NULL
);
return
tscAddTableMetaInfo
(
pQueryInfo
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
);
}
void
tscClearTableMetaInfo
(
STableMetaInfo
*
pTableMetaInfo
,
bool
removeFromCache
)
{
...
...
@@ -1822,7 +1867,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
assert
(
pSql
->
cmd
.
clauseIndex
==
0
);
STableMetaInfo
*
pMasterTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
,
0
);
tscAddTableMetaInfo
(
pQueryInfo
,
pMasterTableMetaInfo
->
name
,
NULL
,
NULL
,
NULL
);
tscAddTableMetaInfo
(
pQueryInfo
,
pMasterTableMetaInfo
->
name
,
NULL
,
NULL
,
NULL
,
NULL
);
registerSqlObj
(
pNew
);
return
pNew
;
...
...
@@ -1987,14 +2032,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
STableMeta
*
pTableMeta
=
taosCacheAcquireByData
(
tscMetaCache
,
pTableMetaInfo
->
pTableMeta
);
// get by name may failed due to the cache cleanup
assert
(
pTableMeta
!=
NULL
);
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
pTableMetaInfo
->
tagColList
);
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pTableMeta
,
pTableMetaInfo
->
vgroupList
,
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
}
else
{
// transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo
*
pPrevInfo
=
tscGetTableMetaInfoFromCmd
(
&
pPrevSql
->
cmd
,
pPrevSql
->
cmd
.
clauseIndex
,
0
);
STableMeta
*
pPrevTableMeta
=
taosCacheTransfer
(
tscMetaCache
,
(
void
**
)
&
pPrevInfo
->
pTableMeta
);
SVgroupsInfo
*
pVgroupsInfo
=
pPrevInfo
->
vgroupList
;
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pPrevTableMeta
,
pVgroupsInfo
,
pTableMetaInfo
->
tagColList
);
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pPrevTableMeta
,
pVgroupsInfo
,
pTableMetaInfo
->
tagColList
,
pTableMetaInfo
->
pVgroupTables
);
}
if
(
pFinalInfo
->
pTableMeta
==
NULL
)
{
...
...
src/common/inc/tglobal.h
浏览文件 @
6c009cd0
...
...
@@ -44,14 +44,17 @@ extern int32_t tsMaxShellConns;
extern
int32_t
tsShellActivityTimer
;
extern
uint32_t
tsMaxTmrCtrl
;
extern
float
tsNumOfThreadsPerCore
;
extern
float
tsRatioOfQueryThreads
;
extern
float
tsRatioOfQueryThreads
;
// todo remove it
extern
int8_t
tsDaylight
;
extern
char
tsTimezone
[];
extern
char
tsLocale
[];
extern
char
tsCharset
[];
// default encode string
extern
char
tsCharset
[];
// default encode string
extern
int32_t
tsEnableCoreFile
;
extern
int32_t
tsCompressMsgSize
;
//query buffer management
extern
int32_t
tsQueryBufferSize
;
// maximum allowed usage buffer for each data node during query processing
// client
extern
int32_t
tsTableMetaKeepTimer
;
extern
int32_t
tsMaxSQLStringLen
;
...
...
src/common/src/tglobal.c
浏览文件 @
6c009cd0
...
...
@@ -45,14 +45,14 @@ int32_t tsEnableTelemetryReporting = 1;
char
tsEmail
[
TSDB_FQDN_LEN
]
=
{
0
};
// common
int32_t
tsRpcTimer
=
1000
;
int32_t
tsRpcMaxTime
=
600
;
// seconds;
int32_t
tsMaxShellConns
=
5000
;
int32_t
tsRpcTimer
=
1000
;
int32_t
tsRpcMaxTime
=
600
;
// seconds;
int32_t
tsMaxShellConns
=
5000
;
int32_t
tsMaxConnections
=
5000
;
int32_t
tsShellActivityTimer
=
3
;
// second
float
tsNumOfThreadsPerCore
=
1
.
0
;
float
tsRatioOfQueryThreads
=
0
.
5
;
int8_t
tsDaylight
=
0
;
int32_t
tsShellActivityTimer
=
3
;
// second
float
tsNumOfThreadsPerCore
=
1
.
0
f
;
float
tsRatioOfQueryThreads
=
0
.
5
f
;
int8_t
tsDaylight
=
0
;
char
tsTimezone
[
TSDB_TIMEZONE_LEN
]
=
{
0
};
char
tsLocale
[
TSDB_LOCALE_LEN
]
=
{
0
};
char
tsCharset
[
TSDB_LOCALE_LEN
]
=
{
0
};
// default encode string
...
...
@@ -99,6 +99,12 @@ float tsStreamComputDelayRatio = 0.1f;
int32_t
tsProjectExecInterval
=
10000
;
// every 10sec, the projection will be executed once
int64_t
tsMaxRetentWindow
=
24
*
3600L
;
// maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
// positive value (in MB)
int32_t
tsQueryBufferSize
=
-
1
;
// db parameters
int32_t
tsCacheBlockSize
=
TSDB_DEFAULT_CACHE_BLOCK_SIZE
;
int32_t
tsBlocksPerVnode
=
TSDB_DEFAULT_TOTAL_BLOCKS
;
...
...
@@ -676,7 +682,7 @@ static void doInitGlobalConfig(void) {
cfg
.
minValue
=
TSDB_MIN_CACHE_BLOCK_SIZE
;
cfg
.
maxValue
=
TSDB_MAX_CACHE_BLOCK_SIZE
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_M
b
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_M
B
;
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"blocks"
;
...
...
@@ -839,6 +845,16 @@ static void doInitGlobalConfig(void) {
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"queryBufferSize"
;
cfg
.
ptr
=
&
tsQueryBufferSize
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
-
1
;
cfg
.
maxValue
=
500000000000
.
0
f
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_BYTE
;
taosInitConfigOption
(
cfg
);
// locale & charset
cfg
.
option
=
"timezone"
;
cfg
.
ptr
=
tsTimezone
;
...
...
src/common/src/tvariant.c
浏览文件 @
6c009cd0
...
...
@@ -144,21 +144,24 @@ void tVariantDestroy(tVariant *pVar) {
void
tVariantAssign
(
tVariant
*
pDst
,
const
tVariant
*
pSrc
)
{
if
(
pSrc
==
NULL
||
pDst
==
NULL
)
return
;
*
pDst
=
*
pSrc
;
pDst
->
nType
=
pSrc
->
nType
;
if
(
pSrc
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pSrc
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
int32_t
len
=
pSrc
->
nLen
+
1
;
if
(
pSrc
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
len
=
len
*
TSDB_NCHAR_SIZE
;
}
pDst
->
pz
=
calloc
(
1
,
len
);
memcpy
(
pDst
->
pz
,
pSrc
->
pz
,
len
);
int32_t
len
=
pSrc
->
nLen
+
TSDB_NCHAR_SIZE
;
char
*
p
=
realloc
(
pDst
->
pz
,
len
);
assert
(
p
);
memset
(
p
,
0
,
len
);
pDst
->
pz
=
p
;
memcpy
(
pDst
->
pz
,
pSrc
->
pz
,
pSrc
->
nLen
);
pDst
->
nLen
=
pSrc
->
nLen
;
return
;
}
// this is only for string array
if
(
pSrc
->
nType
==
TSDB_DATA_TYPE_ARRAY
)
{
if
(
pSrc
->
nType
>=
TSDB_DATA_TYPE_BOOL
&&
pSrc
->
nType
<=
TSDB_DATA_TYPE_DOUBLE
)
{
pDst
->
i64Key
=
pSrc
->
i64Key
;
}
else
if
(
pSrc
->
nType
==
TSDB_DATA_TYPE_ARRAY
)
{
// this is only for string array
size_t
num
=
taosArrayGetSize
(
pSrc
->
arr
);
pDst
->
arr
=
taosArrayInit
(
num
,
sizeof
(
char
*
));
for
(
size_t
i
=
0
;
i
<
num
;
i
++
)
{
...
...
@@ -166,8 +169,6 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
char
*
n
=
strdup
(
p
);
taosArrayPush
(
pDst
->
arr
,
&
n
);
}
return
;
}
pDst
->
nLen
=
tDataTypeDesc
[
pDst
->
nType
].
nSize
;
...
...
src/inc/query.h
浏览文件 @
6c009cd0
...
...
@@ -78,7 +78,6 @@ int32_t qKillQuery(qinfo_t qinfo);
int32_t
qQueryCompleted
(
qinfo_t
qinfo
);
/**
* destroy query info structure
* @param qHandle
...
...
src/inc/taoserror.h
浏览文件 @
6c009cd0
...
...
@@ -230,6 +230,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_HAS_RSP
,
0
,
0x0708
,
"Query should response"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_IN_EXEC
,
0
,
0x0709
,
"Multiple retrieval of this query"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
,
0
,
0x070A
,
"Too many time window in query"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_NOT_ENOUGH_BUFFER
,
0
,
0x070B
,
"Query buffer limit has reached"
)
// grant
TAOS_DEFINE_ERROR
(
TSDB_CODE_GRANT_EXPIRED
,
0
,
0x0800
,
"License expired"
)
...
...
src/query/inc/qTsbuf.h
浏览文件 @
6c009cd0
...
...
@@ -35,16 +35,9 @@ typedef struct STSList {
int32_t
len
;
}
STSList
;
typedef
struct
STSRawBlock
{
int32_t
vnode
;
int64_t
tag
;
TSKEY
*
ts
;
int32_t
len
;
}
STSRawBlock
;
typedef
struct
STSElem
{
TSKEY
ts
;
tVariant
tag
;
tVariant
*
tag
;
int32_t
vnode
;
}
STSElem
;
...
...
@@ -84,6 +77,7 @@ typedef struct STSBuf {
char
path
[
PATH_MAX
];
uint32_t
fileSize
;
// todo use array
STSVnodeBlockInfoEx
*
pData
;
uint32_t
numOfAlloc
;
uint32_t
numOfVnodes
;
...
...
@@ -106,12 +100,12 @@ typedef struct STSBufFileHeader {
STSBuf
*
tsBufCreate
(
bool
autoDelete
,
int32_t
order
);
STSBuf
*
tsBufCreateFromFile
(
const
char
*
path
,
bool
autoDelete
);
STSBuf
*
tsBufCreateFromCompBlocks
(
const
char
*
pData
,
int32_t
numOfBlocks
,
int32_t
len
,
int32_t
tsOrder
);
STSBuf
*
tsBufCreateFromCompBlocks
(
const
char
*
pData
,
int32_t
numOfBlocks
,
int32_t
len
,
int32_t
tsOrder
,
int32_t
vnodeId
);
void
*
tsBufDestroy
(
STSBuf
*
pTSBuf
);
void
tsBufAppend
(
STSBuf
*
pTSBuf
,
int32_t
vnodeId
,
tVariant
*
tag
,
const
char
*
pData
,
int32_t
len
);
int32_t
tsBufMerge
(
STSBuf
*
pDestBuf
,
const
STSBuf
*
pSrcBuf
,
int32_t
vnodeIdx
);
int32_t
tsBufMerge
(
STSBuf
*
pDestBuf
,
const
STSBuf
*
pSrcBuf
);
STSBuf
*
tsBufClone
(
STSBuf
*
pTSBuf
);
...
...
@@ -121,6 +115,7 @@ void tsBufFlush(STSBuf* pTSBuf);
void
tsBufResetPos
(
STSBuf
*
pTSBuf
);
STSElem
tsBufGetElem
(
STSBuf
*
pTSBuf
);
bool
tsBufNextPos
(
STSBuf
*
pTSBuf
);
STSElem
tsBufGetElemStartPos
(
STSBuf
*
pTSBuf
,
int32_t
vnodeId
,
tVariant
*
tag
);
...
...
@@ -136,6 +131,10 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
*/
void
tsBufDisplay
(
STSBuf
*
pTSBuf
);
int32_t
tsBufGetNumOfVnodes
(
STSBuf
*
pTSBuf
);
void
tsBufGetVnodeIdList
(
STSBuf
*
pTSBuf
,
int32_t
*
num
,
int32_t
**
vnodeId
);
#ifdef __cplusplus
}
#endif
...
...
src/query/src/qExecutor.c
浏览文件 @
6c009cd0
...
...
@@ -184,7 +184,7 @@ static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInf
static
bool
functionNeedToExecute
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
);
static
void
setExecParams
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
void
*
inputData
,
TSKEY
*
tsCol
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
*
pStatis
,
void
*
param
,
int32_t
colIndex
);
SDataStatis
*
pStatis
,
void
*
param
,
int32_t
colIndex
,
int32_t
vgId
);
static
void
initCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
void
destroyTableQueryInfoImpl
(
STableQueryInfo
*
pTableQueryInfo
);
...
...
@@ -194,6 +194,8 @@ static void buildTagQueryResult(SQInfo *pQInfo);
static
int32_t
setAdditionalInfo
(
SQInfo
*
pQInfo
,
void
*
pTable
,
STableQueryInfo
*
pTableQueryInfo
);
static
int32_t
flushFromResultBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
);
static
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
static
void
releaseQueryBuf
(
size_t
numOfTables
);
bool
doFilterData
(
SQuery
*
pQuery
,
int32_t
elemPos
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfFilterCols
;
++
k
)
{
...
...
@@ -1005,9 +1007,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
char
*
dataBlock
=
getDataBlock
(
pRuntimeEnv
,
&
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
sasArray
[
k
],
k
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
sasArray
[
k
],
k
,
pQInfo
->
vgId
);
}
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
...
...
@@ -1200,7 +1203,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
// compare tag first
if
(
tVariantCompare
(
&
pCtx
[
0
].
tag
,
&
elem
.
tag
)
!=
0
)
{
if
(
tVariantCompare
(
&
pCtx
[
0
].
tag
,
elem
.
tag
)
!=
0
)
{
return
TS_JOIN_TAG_NOT_EQUALS
;
}
...
...
@@ -1286,9 +1289,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
groupbyColumnData
=
getGroupbyColumnData
(
pQuery
,
&
type
,
&
bytes
,
pDataBlock
);
}
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
char
*
dataBlock
=
getDataBlock
(
pRuntimeEnv
,
&
sasArray
[
k
],
k
,
pDataBlockInfo
->
rows
,
pDataBlock
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
sasArray
[
k
],
k
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
tsCols
,
pDataBlockInfo
,
pStatis
,
&
sasArray
[
k
],
k
,
pQInfo
->
vgId
);
}
// set the input column data
...
...
@@ -1303,7 +1307,6 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// from top to bottom in desc
// from bottom to top in asc order
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pRuntimeEnv
);
qDebug
(
"QInfo:%p process data rows, numOfRows:%d, query order:%d, ts comp order:%d"
,
pQInfo
,
pDataBlockInfo
->
rows
,
pQuery
->
order
.
order
,
pRuntimeEnv
->
pTSBuf
->
cur
.
order
);
}
...
...
@@ -1409,6 +1412,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
item
->
lastKey
=
(
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pDataBlockInfo
->
window
.
ekey
:
pDataBlockInfo
->
window
.
skey
)
+
step
;
}
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
item
->
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTSBuf
);
}
// todo refactor: extract method
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
if
(
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
!=
TSDB_FUNC_ARITHM
)
{
...
...
@@ -1469,7 +1476,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
}
void
setExecParams
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
void
*
inputData
,
TSKEY
*
tsCol
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
*
pStatis
,
void
*
param
,
int32_t
colIndex
)
{
SDataStatis
*
pStatis
,
void
*
param
,
int32_t
colIndex
,
int32_t
vgId
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
colIndex
].
base
.
functionId
;
int32_t
colId
=
pQuery
->
pSelectExpr
[
colIndex
].
base
.
colInfo
.
colId
;
...
...
@@ -1542,6 +1549,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
}
}
}
}
else
if
(
functionId
==
TSDB_FUNC_TS_COMP
)
{
pCtx
->
param
[
0
].
i64Key
=
vgId
;
pCtx
->
param
[
0
].
nType
=
TSDB_DATA_TYPE_BIGINT
;
}
#if defined(_DEBUG_VIEW)
...
...
@@ -2625,8 +2635,13 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
SColumnInfo
*
pColInfo
=
doGetTagColumnInfoById
(
pQuery
->
tagColList
,
pQuery
->
numOfTags
,
tagColId
);
doSetTagValueInParam
(
tsdb
,
pTable
,
tagColId
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
,
pColInfo
->
type
,
pColInfo
->
bytes
);
if
(
pRuntimeEnv
->
pCtx
[
0
].
tag
.
nType
==
TSDB_DATA_TYPE_BINARY
||
pRuntimeEnv
->
pCtx
[
0
].
tag
.
nType
==
TSDB_DATA_TYPE_NCHAR
)
{}
qDebug
(
"QInfo:%p set tag value for join comparison, colId:%"
PRId64
", val:%s"
,
pQInfo
,
pExprInfo
->
base
.
arg
->
argValue
.
i64
,
pRuntimeEnv
->
pCtx
[
0
].
tag
.
pz
);
}
else
{
qDebug
(
"QInfo:%p set tag value for join comparison, colId:%"
PRId64
", val:%"
PRId64
,
pQInfo
,
pExprInfo
->
base
.
arg
->
argValue
.
i64
,
pRuntimeEnv
->
pCtx
[
0
].
tag
.
i64Key
)
pRuntimeEnv
->
pCtx
[
0
].
tag
.
i64Key
)
;
}
}
}
...
...
@@ -3860,14 +3875,40 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQ
// both the master and supplement scan needs to set the correct ts comp start position
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
tVariant
*
pTag
=
&
pRuntimeEnv
->
pCtx
[
0
].
tag
;
if
(
pTableQueryInfo
->
cur
.
vgroupIndex
==
-
1
)
{
tVariantAssign
(
&
pTableQueryInfo
->
tag
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
);
tsBufGetElemStartPos
(
pRuntimeEnv
->
pTSBuf
,
0
,
&
pTableQueryInfo
->
tag
);
tVariantAssign
(
&
pTableQueryInfo
->
tag
,
pTag
);
STSElem
elem
=
tsBufGetElemStartPos
(
pRuntimeEnv
->
pTSBuf
,
pQInfo
->
vgId
,
&
pTableQueryInfo
->
tag
);
// failed to find data with the specified tag value and vnodeId
if
(
elem
.
vnode
<
0
)
{
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qError
(
"QInfo:%p failed to find tag:%s in ts_comp"
,
pQInfo
,
pTag
->
pz
);
}
else
{
qError
(
"QInfo:%p failed to find tag:%"
PRId64
" in ts_comp"
,
pQInfo
,
pTag
->
i64Key
);
}
return
false
;
}
// keep the cursor info of current meter
pTableQueryInfo
->
cur
=
pRuntimeEnv
->
pTSBuf
->
cur
;
pTableQueryInfo
->
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTSBuf
);
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
pz
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
else
{
qDebug
(
"QInfo:%p find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
i64Key
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
}
else
{
tsBufSetCursor
(
pRuntimeEnv
->
pTSBuf
,
&
pTableQueryInfo
->
cur
);
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
pz
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
else
{
qDebug
(
"QInfo:%p find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
i64Key
,
pTableQueryInfo
->
cur
.
blockIndex
,
pTableQueryInfo
->
cur
.
tsIndex
);
}
}
}
...
...
@@ -4763,15 +4804,62 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
}
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
if
(
pRuntimeEnv
->
cur
.
vgroupIndex
==
-
1
)
{
STSElem
elem
=
tsBufGetElemStartPos
(
pRuntimeEnv
->
pTSBuf
,
0
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
);
tVariant
*
pTag
=
&
pRuntimeEnv
->
pCtx
[
0
].
tag
;
// failed to find data with the specified tag value
if
(
pRuntimeEnv
->
cur
.
vgroupIndex
==
-
1
)
{
STSElem
elem
=
tsBufGetElemStartPos
(
pRuntimeEnv
->
pTSBuf
,
pQInfo
->
vgId
,
pTag
);
// failed to find data with the specified tag value and vnodeId
if
(
elem
.
vnode
<
0
)
{
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qError
(
"QInfo:%p failed to find tag:%s in ts_comp"
,
pQInfo
,
pTag
->
pz
);
}
else
{
qError
(
"QInfo:%p failed to find tag:%"
PRId64
" in ts_comp"
,
pQInfo
,
pTag
->
i64Key
);
}
return
false
;
}
else
{
STSCursor
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTSBuf
);
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
pz
,
cur
.
blockIndex
,
cur
.
tsIndex
);
}
else
{
qDebug
(
"QInfo:%p find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
i64Key
,
cur
.
blockIndex
,
cur
.
tsIndex
);
}
}
}
else
{
tsBufSetCursor
(
pRuntimeEnv
->
pTSBuf
,
&
pRuntimeEnv
->
cur
);
STSElem
elem
=
tsBufGetElem
(
pRuntimeEnv
->
pTSBuf
);
if
(
tVariantCompare
(
elem
.
tag
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
)
!=
0
)
{
STSElem
elem1
=
tsBufGetElemStartPos
(
pRuntimeEnv
->
pTSBuf
,
pQInfo
->
vgId
,
pTag
);
// failed to find data with the specified tag value and vnodeId
if
(
elem1
.
vnode
<
0
)
{
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qError
(
"QInfo:%p failed to find tag:%s in ts_comp"
,
pQInfo
,
pTag
->
pz
);
}
else
{
qError
(
"QInfo:%p failed to find tag:%"
PRId64
" in ts_comp"
,
pQInfo
,
pTag
->
i64Key
);
}
return
false
;
}
else
{
STSCursor
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTSBuf
);
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
pz
,
cur
.
blockIndex
,
cur
.
tsIndex
);
}
else
{
qDebug
(
"QInfo:%p find tag:%"
PRId64
" start pos in ts_comp, blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
i64Key
,
cur
.
blockIndex
,
cur
.
tsIndex
);
}
}
}
else
{
tsBufSetCursor
(
pRuntimeEnv
->
pTSBuf
,
&
pRuntimeEnv
->
cur
);
STSCursor
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTSBuf
);
if
(
pTag
->
nType
==
TSDB_DATA_TYPE_BINARY
||
pTag
->
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
qDebug
(
"QInfo:%p continue scan ts_comp file, tag:%s blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
pz
,
cur
.
blockIndex
,
cur
.
tsIndex
);
}
else
{
qDebug
(
"QInfo:%p continue scan ts_comp file, tag:%"
PRId64
" blockIndex:%d, tsIndex:%d"
,
pQInfo
,
pTag
->
i64Key
,
cur
.
blockIndex
,
cur
.
tsIndex
);
}
}
}
}
...
...
@@ -5027,6 +5115,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
break
;
}
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
pRuntimeEnv
->
cur
=
pRuntimeEnv
->
pTSBuf
->
cur
;
}
}
else
{
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if
(
pQuery
->
rec
.
rows
==
0
)
{
...
...
@@ -6320,7 +6412,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
STSBuf
*
pTSBuf
=
NULL
;
if
(
pQueryMsg
->
tsLen
>
0
)
{
// open new file to save the result
char
*
tsBlock
=
(
char
*
)
pQueryMsg
+
pQueryMsg
->
tsOffset
;
pTSBuf
=
tsBufCreateFromCompBlocks
(
tsBlock
,
pQueryMsg
->
tsNumOfBlocks
,
pQueryMsg
->
tsLen
,
pQueryMsg
->
tsOrder
);
pTSBuf
=
tsBufCreateFromCompBlocks
(
tsBlock
,
pQueryMsg
->
tsNumOfBlocks
,
pQueryMsg
->
tsLen
,
pQueryMsg
->
tsOrder
,
vgId
);
tsBufResetPos
(
pTSBuf
);
bool
ret
=
tsBufNextPos
(
pTSBuf
);
...
...
@@ -6402,6 +6494,8 @@ static void freeQInfo(SQInfo *pQInfo) {
qDebug
(
"QInfo:%p start to free QInfo"
,
pQInfo
);
releaseQueryBuf
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
);
teardownQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
);
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
...
...
@@ -6636,6 +6730,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert
(
0
);
}
code
=
checkForQueryBuf
(
tableGroupInfo
.
numOfTables
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// not enough query buffer, abort
goto
_over
;
}
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
pGroupbyExpr
,
pExprs
,
&
tableGroupInfo
,
pTagColumnInfo
,
isSTableQuery
);
pExprs
=
NULL
;
pGroupbyExpr
=
NULL
;
...
...
@@ -7037,6 +7136,48 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
}
static
int64_t
getQuerySupportBufSize
(
size_t
numOfTables
)
{
size_t
s1
=
sizeof
(
STableQueryInfo
);
size_t
s2
=
sizeof
(
SHashNode
);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
return
(
int64_t
)((
s1
+
s2
)
*
1
.
5
*
numOfTables
);
}
int32_t
checkForQueryBuf
(
size_t
numOfTables
)
{
int64_t
t
=
getQuerySupportBufSize
(
numOfTables
);
if
(
tsQueryBufferSize
<
0
)
{
return
TSDB_CODE_SUCCESS
;
}
else
if
(
tsQueryBufferSize
>
0
)
{
while
(
1
)
{
int64_t
s
=
tsQueryBufferSize
;
int64_t
remain
=
s
-
t
;
if
(
remain
>=
0
)
{
if
(
atomic_val_compare_exchange_64
(
&
tsQueryBufferSize
,
s
,
remain
)
==
s
)
{
return
TSDB_CODE_SUCCESS
;
}
}
else
{
return
TSDB_CODE_QRY_NOT_ENOUGH_BUFFER
;
}
}
}
// disable query processing if the value of tsQueryBufferSize is zero.
return
TSDB_CODE_QRY_NOT_ENOUGH_BUFFER
;
}
void
releaseQueryBuf
(
size_t
numOfTables
)
{
if
(
tsQueryBufferSize
<=
0
)
{
return
;
}
int64_t
t
=
getQuerySupportBufSize
(
numOfTables
);
// restore value is not enough buffer available
atomic_add_fetch_64
(
&
tsQueryBufferSize
,
t
);
}
void
*
qGetResultRetrieveMsg
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
!=
NULL
);
...
...
src/query/src/qTsbuf.c
浏览文件 @
6c009cd0
...
...
@@ -403,7 +403,7 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pDa
}
else
{
expandBuffer
(
ptsData
,
len
);
}
tVariantAssign
(
&
pTSBuf
->
block
.
tag
,
tag
);
memcpy
(
ptsData
->
rawBuf
+
ptsData
->
len
,
pData
,
(
size_t
)
len
);
...
...
@@ -561,6 +561,19 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
pCur
->
tsIndex
=
(
pCur
->
order
==
TSDB_ORDER_ASC
)
?
0
:
pBlock
->
numOfElem
-
1
;
}
static
int32_t
doUpdateVnodeInfo
(
STSBuf
*
pTSBuf
,
int64_t
offset
,
STSVnodeBlockInfo
*
pVInfo
)
{
if
(
offset
<
0
||
offset
>=
getDataStartOffset
())
{
return
-
1
;
}
if
(
fseek
(
pTSBuf
->
f
,
(
int32_t
)
offset
,
SEEK_SET
)
!=
0
)
{
return
-
1
;
}
fwrite
(
pVInfo
,
sizeof
(
STSVnodeBlockInfo
),
1
,
pTSBuf
->
f
);
return
0
;
}
STSVnodeBlockInfo
*
tsBufGetVnodeBlockInfo
(
STSBuf
*
pTSBuf
,
int32_t
vnodeId
)
{
int32_t
j
=
tsBufFindVnodeIndexFromId
(
pTSBuf
->
pData
,
pTSBuf
->
numOfVnodes
,
vnodeId
);
if
(
j
==
-
1
)
{
...
...
@@ -649,7 +662,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
return
false
;
}
int32_t
blockIndex
=
pCur
->
order
==
TSDB_ORDER_ASC
?
0
:
pBlockInfo
->
numOfBlocks
-
1
;
int32_t
blockIndex
=
(
pCur
->
order
==
TSDB_ORDER_ASC
)
?
0
:
(
pBlockInfo
->
numOfBlocks
-
1
)
;
tsBufGetBlock
(
pTSBuf
,
pCur
->
vgroupIndex
+
step
,
blockIndex
);
break
;
...
...
@@ -675,8 +688,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
}
STSElem
tsBufGetElem
(
STSBuf
*
pTSBuf
)
{
STSElem
elem1
=
{.
vnode
=
-
1
};
STSElem
elem1
=
{.
vnode
=
-
1
};
if
(
pTSBuf
==
NULL
)
{
return
elem1
;
}
...
...
@@ -690,7 +702,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
elem1
.
vnode
=
pTSBuf
->
pData
[
pCur
->
vgroupIndex
].
info
.
vnode
;
elem1
.
ts
=
*
(
TSKEY
*
)(
pTSBuf
->
tsData
.
rawBuf
+
pCur
->
tsIndex
*
TSDB_KEYSIZE
);
tVariantAssign
(
&
elem1
.
tag
,
&
pBlock
->
tag
)
;
elem1
.
tag
=
&
pBlock
->
tag
;
return
elem1
;
}
...
...
@@ -702,7 +714,7 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
* @param vnodeId
* @return
*/
int32_t
tsBufMerge
(
STSBuf
*
pDestBuf
,
const
STSBuf
*
pSrcBuf
,
int32_t
vnodeId
)
{
int32_t
tsBufMerge
(
STSBuf
*
pDestBuf
,
const
STSBuf
*
pSrcBuf
)
{
if
(
pDestBuf
==
NULL
||
pSrcBuf
==
NULL
||
pSrcBuf
->
numOfVnodes
<=
0
)
{
return
0
;
}
...
...
@@ -712,14 +724,13 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
}
// src can only have one vnode index
if
(
pSrcBuf
->
numOfVnodes
>
1
)
{
return
-
1
;
}
assert
(
pSrcBuf
->
numOfVnodes
==
1
);
// there are data in buffer, flush to disk first
tsBufFlush
(
pDestBuf
);
// compared with the last vnode id
int32_t
vnodeId
=
tsBufGetLastVnodeInfo
((
STSBuf
*
)
pSrcBuf
)
->
info
.
vnode
;
if
(
vnodeId
!=
tsBufGetLastVnodeInfo
(
pDestBuf
)
->
info
.
vnode
)
{
int32_t
oldSize
=
pDestBuf
->
numOfVnodes
;
int32_t
newSize
=
oldSize
+
pSrcBuf
->
numOfVnodes
;
...
...
@@ -791,14 +802,14 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
return
0
;
}
STSBuf
*
tsBufCreateFromCompBlocks
(
const
char
*
pData
,
int32_t
numOfBlocks
,
int32_t
len
,
int32_t
order
)
{
STSBuf
*
tsBufCreateFromCompBlocks
(
const
char
*
pData
,
int32_t
numOfBlocks
,
int32_t
len
,
int32_t
order
,
int32_t
vnodeId
)
{
STSBuf
*
pTSBuf
=
tsBufCreate
(
true
,
order
);
STSVnodeBlockInfo
*
pBlockInfo
=
&
(
addOneVnodeInfo
(
pTSBuf
,
0
)
->
info
);
pBlockInfo
->
numOfBlocks
=
numOfBlocks
;
pBlockInfo
->
compLen
=
len
;
pBlockInfo
->
offset
=
getDataStartOffset
();
pBlockInfo
->
vnode
=
0
;
pBlockInfo
->
vnode
=
vnodeId
;
// update prev vnode length info in file
TSBufUpdateVnodeInfo
(
pTSBuf
,
pTSBuf
->
numOfVnodes
-
1
,
pBlockInfo
);
...
...
@@ -902,8 +913,8 @@ void tsBufDisplay(STSBuf* pTSBuf) {
while
(
tsBufNextPos
(
pTSBuf
))
{
STSElem
elem
=
tsBufGetElem
(
pTSBuf
);
if
(
elem
.
tag
.
nType
==
TSDB_DATA_TYPE_BIGINT
)
{
printf
(
"%d-%"
PRId64
"-%"
PRId64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
.
i64Key
,
elem
.
ts
);
if
(
elem
.
tag
->
nType
==
TSDB_DATA_TYPE_BIGINT
)
{
printf
(
"%d-%"
PRId64
"-%"
PRId64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
->
i64Key
,
elem
.
ts
);
}
}
...
...
@@ -915,19 +926,6 @@ static int32_t getDataStartOffset() {
return
sizeof
(
STSBufFileHeader
)
+
TS_COMP_FILE_VNODE_MAX
*
sizeof
(
STSVnodeBlockInfo
);
}
static
int32_t
doUpdateVnodeInfo
(
STSBuf
*
pTSBuf
,
int64_t
offset
,
STSVnodeBlockInfo
*
pVInfo
)
{
if
(
offset
<
0
||
offset
>=
getDataStartOffset
())
{
return
-
1
;
}
if
(
fseek
(
pTSBuf
->
f
,
(
int32_t
)
offset
,
SEEK_SET
)
!=
0
)
{
return
-
1
;
}
fwrite
(
pVInfo
,
sizeof
(
STSVnodeBlockInfo
),
1
,
pTSBuf
->
f
);
return
0
;
}
// update prev vnode length info in file
static
void
TSBufUpdateVnodeInfo
(
STSBuf
*
pTSBuf
,
int32_t
index
,
STSVnodeBlockInfo
*
pBlockInfo
)
{
int32_t
offset
=
sizeof
(
STSBufFileHeader
)
+
index
*
sizeof
(
STSVnodeBlockInfo
);
...
...
@@ -969,3 +967,29 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
pTSBuf
->
fileSize
+=
getDataStartOffset
();
return
pTSBuf
;
}
int32_t
tsBufGetNumOfVnodes
(
STSBuf
*
pTSBuf
)
{
if
(
pTSBuf
==
NULL
)
{
return
0
;
}
return
pTSBuf
->
numOfVnodes
;
}
void
tsBufGetVnodeIdList
(
STSBuf
*
pTSBuf
,
int32_t
*
num
,
int32_t
**
vnodeId
)
{
int32_t
size
=
tsBufGetNumOfVnodes
(
pTSBuf
);
if
(
num
!=
NULL
)
{
*
num
=
size
;
}
*
vnodeId
=
NULL
;
if
(
size
==
0
)
{
return
;
}
(
*
vnodeId
)
=
malloc
(
tsBufGetNumOfVnodes
(
pTSBuf
)
*
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
(
*
vnodeId
)[
i
]
=
pTSBuf
->
pData
[
i
].
info
.
vnode
;
}
}
\ No newline at end of file
src/query/tests/tsBufTest.cpp
浏览文件 @
6c009cd0
...
...
@@ -304,7 +304,7 @@ void TSTraverse() {
int32_t
totalOutput
=
10
;
while
(
1
)
{
STSElem
elem
=
tsBufGetElem
(
pTSBuf
);
printf
(
"%d-%"
PRIu64
"-%"
PRIu64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
.
i64Key
,
elem
.
ts
);
printf
(
"%d-%"
PRIu64
"-%"
PRIu64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
->
i64Key
,
elem
.
ts
);
if
(
!
tsBufNextPos
(
pTSBuf
))
{
break
;
...
...
@@ -352,7 +352,7 @@ void TSTraverse() {
totalOutput
=
10
;
while
(
1
)
{
STSElem
elem
=
tsBufGetElem
(
pTSBuf
);
printf
(
"%d-%"
PRIu64
"-%"
PRIu64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
.
i64Key
,
elem
.
ts
);
printf
(
"%d-%"
PRIu64
"-%"
PRIu64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
->
i64Key
,
elem
.
ts
);
if
(
!
tsBufNextPos
(
pTSBuf
))
{
break
;
...
...
@@ -416,8 +416,8 @@ void mergeDiffVnodeBufferTest() {
int64_t
*
list
=
createTsList
(
num
,
start
,
step
);
t
.
i64Key
=
i
;
tsBufAppend
(
pTSBuf1
,
0
,
&
t
,
(
const
char
*
)
list
,
num
*
sizeof
(
int64_t
));
tsBufAppend
(
pTSBuf2
,
0
,
&
t
,
(
const
char
*
)
list
,
num
*
sizeof
(
int64_t
));
tsBufAppend
(
pTSBuf1
,
1
,
&
t
,
(
const
char
*
)
list
,
num
*
sizeof
(
int64_t
));
tsBufAppend
(
pTSBuf2
,
9
,
&
t
,
(
const
char
*
)
list
,
num
*
sizeof
(
int64_t
));
free
(
list
);
...
...
@@ -426,7 +426,7 @@ void mergeDiffVnodeBufferTest() {
tsBufFlush
(
pTSBuf2
);
tsBufMerge
(
pTSBuf1
,
pTSBuf2
,
9
);
tsBufMerge
(
pTSBuf1
,
pTSBuf2
);
EXPECT_EQ
(
pTSBuf1
->
numOfVnodes
,
2
);
EXPECT_EQ
(
pTSBuf1
->
numOfTotal
,
numOfTags
*
2
*
num
);
...
...
@@ -459,8 +459,6 @@ void mergeIdenticalVnodeBufferTest() {
start
+=
step
*
num
;
}
for
(
int32_t
i
=
numOfTags
;
i
<
numOfTags
*
2
;
++
i
)
{
int64_t
*
list
=
createTsList
(
num
,
start
,
step
);
...
...
@@ -473,7 +471,7 @@ void mergeIdenticalVnodeBufferTest() {
tsBufFlush
(
pTSBuf2
);
tsBufMerge
(
pTSBuf1
,
pTSBuf2
,
12
);
tsBufMerge
(
pTSBuf1
,
pTSBuf2
);
EXPECT_EQ
(
pTSBuf1
->
numOfVnodes
,
1
);
EXPECT_EQ
(
pTSBuf1
->
numOfTotal
,
numOfTags
*
2
*
num
);
...
...
@@ -482,7 +480,7 @@ void mergeIdenticalVnodeBufferTest() {
STSElem
elem
=
tsBufGetElem
(
pTSBuf1
);
EXPECT_EQ
(
elem
.
vnode
,
12
);
printf
(
"%d-%"
PRIu64
"-%"
PRIu64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
.
i64Key
,
elem
.
ts
);
printf
(
"%d-%"
PRIu64
"-%"
PRIu64
"
\n
"
,
elem
.
vnode
,
elem
.
tag
->
i64Key
,
elem
.
ts
);
}
tsBufDestroy
(
pTSBuf1
);
...
...
src/util/inc/tconfig.h
浏览文件 @
6c009cd0
...
...
@@ -53,7 +53,7 @@ enum {
TAOS_CFG_UTYPE_NONE
,
TAOS_CFG_UTYPE_PERCENT
,
TAOS_CFG_UTYPE_GB
,
TAOS_CFG_UTYPE_M
b
,
TAOS_CFG_UTYPE_M
B
,
TAOS_CFG_UTYPE_BYTE
,
TAOS_CFG_UTYPE_SECOND
,
TAOS_CFG_UTYPE_MS
...
...
src/util/src/tcache.c
浏览文件 @
6c009cd0
...
...
@@ -335,7 +335,7 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
}
void
taosCacheRelease
(
SCacheObj
*
pCacheObj
,
void
**
data
,
bool
_remove
)
{
if
(
pCacheObj
==
NULL
||
taosHashGetSize
(
pCacheObj
->
pHashTable
)
+
pCacheObj
->
numOfElemsInTrash
==
0
)
{
if
(
pCacheObj
==
NULL
)
{
return
;
}
...
...
@@ -343,7 +343,12 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
uError
(
"cache:%s, NULL data to release"
,
pCacheObj
->
name
);
return
;
}
// The operation of removal from hash table and addition to trashcan is not an atomic operation,
// therefore the check for the empty of both the hash table and the trashcan has a race condition.
// It happens when there is only one object in the cache, and two threads which has referenced this object
// start to free the it simultaneously [TD-1569].
size_t
offset
=
offsetof
(
SCacheDataNode
,
data
);
SCacheDataNode
*
pNode
=
(
SCacheDataNode
*
)((
char
*
)(
*
data
)
-
offset
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录