Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cded09bf
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cded09bf
编写于
5月 19, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(query): limit the rsp ssdatablock size.
上级
f636806c
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
171 addition
and
41 deletion
+171
-41
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+72
-31
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+20
-10
tests/script/tsim/testsuit.sim
tests/script/tsim/testsuit.sim
+79
-0
未找到文件。
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
cded09bf
...
...
@@ -425,6 +425,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
rowLen
+=
pCond
->
colList
[
i
].
bytes
;
}
// make sure the output SSDataBlock size be less than 2MB.
int32_t
TWOMB
=
2
*
1024
*
1024
;
if
(
pReadHandle
->
outputCapacity
*
rowLen
>
TWOMB
)
{
pReadHandle
->
outputCapacity
=
TWOMB
/
rowLen
;
}
// allocate buffer in order to load data blocks from file
pReadHandle
->
suppInfo
.
pstatis
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnDataAgg
));
if
(
pReadHandle
->
suppInfo
.
pstatis
==
NULL
)
{
...
...
@@ -1302,20 +1308,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if
((
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<=
binfo
.
window
.
ekey
))
||
(
!
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>=
binfo
.
window
.
skey
)))
{
if
((
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<
binfo
.
window
.
skey
))
||
(
!
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>
binfo
.
window
.
ekey
)))
{
bool
cacheDataInFileBlockHole
=
(
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<
binfo
.
window
.
skey
))
||
(
!
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>
binfo
.
window
.
ekey
));
if
(
cacheDataInFileBlockHole
)
{
// do not load file block into buffer
int32_t
step
=
ascScan
?
1
:
-
1
;
TSKEY
maxKey
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
(
binfo
.
window
.
skey
-
step
)
:
(
binfo
.
window
.
ekey
-
step
);
TSKEY
maxKey
=
ascScan
?
(
binfo
.
window
.
skey
-
step
)
:
(
binfo
.
window
.
ekey
-
step
);
cur
->
rows
=
tsdbReadRowsFromCache
(
pCheckInfo
,
maxKey
,
pTsdbReadHandle
->
outputCapacity
,
&
cur
->
win
,
pTsdbReadHandle
);
pTsdbReadHandle
->
realNumOfRows
=
cur
->
rows
;
// update the last key value
pCheckInfo
->
lastKey
=
cur
->
win
.
ekey
+
step
;
if
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
if
(
!
ascScan
)
{
TSWAP
(
cur
->
win
.
skey
,
cur
->
win
.
ekey
);
}
...
...
@@ -1334,18 +1342,16 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
/*
* no data in cache, only load data from file
* during the query processing, data in cache will not be checked anymore.
*
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
*/
assert
(
pTsdbReadHandle
->
outputCapacity
>=
binfo
.
rows
);
int32_t
endPos
=
getEndPosInDataBlock
(
pTsdbReadHandle
,
&
binfo
);
if
((
cur
->
pos
==
0
&&
endPos
==
binfo
.
rows
-
1
&&
ascScan
)
||
(
cur
->
pos
==
(
binfo
.
rows
-
1
)
&&
endPos
==
0
&&
(
!
ascScan
))
)
{
bool
wholeBlockReturned
=
((
abs
(
cur
->
pos
-
endPos
)
+
1
)
==
binfo
.
rows
);
if
(
wholeBlockReturned
)
{
pTsdbReadHandle
->
realNumOfRows
=
binfo
.
rows
;
cur
->
rows
=
binfo
.
rows
;
cur
->
win
=
binfo
.
window
;
cur
->
win
=
binfo
.
window
;
cur
->
mixBlock
=
false
;
cur
->
blockCompleted
=
true
;
...
...
@@ -1356,12 +1362,24 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
cur
->
lastKey
=
binfo
.
window
.
skey
-
1
;
cur
->
pos
=
-
1
;
}
}
else
{
// partially copy to dest buffer
}
else
{
// partially copy to dest buffer
// make sure to only load once
bool
firstTimeExtract
=
((
cur
->
pos
==
0
&&
ascScan
)
||
(
cur
->
pos
==
binfo
.
rows
-
1
&&
(
!
ascScan
)));
if
(
pTsdbReadHandle
->
outputCapacity
<
binfo
.
rows
&&
firstTimeExtract
)
{
code
=
doLoadFileDataBlock
(
pTsdbReadHandle
,
pBlock
,
pCheckInfo
,
cur
->
slot
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
copyAllRemainRowsFromFileBlock
(
pTsdbReadHandle
,
pCheckInfo
,
&
binfo
,
endPos
);
cur
->
mixBlock
=
true
;
}
assert
(
cur
->
blockCompleted
);
if
(
pTsdbReadHandle
->
outputCapacity
>=
binfo
.
rows
)
{
ASSERT
(
cur
->
blockCompleted
);
}
if
(
cur
->
rows
==
binfo
.
rows
)
{
tsdbDebug
(
"%p whole file block qualified, brange:%"
PRId64
"-%"
PRId64
", rows:%d, lastKey:%"
PRId64
", %s"
,
pTsdbReadHandle
,
cur
->
win
.
skey
,
cur
->
win
.
ekey
,
cur
->
rows
,
cur
->
lastKey
,
pTsdbReadHandle
->
idStr
);
...
...
@@ -1858,15 +1876,14 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
SDataCols
*
pCols
=
pTsdbReadHandle
->
rhelper
.
pDCols
[
0
];
TSKEY
*
tsArray
=
pCols
->
cols
[
0
].
pData
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
1
:
-
1
;
int32_t
numOfCols
=
(
int32_t
)(
QH_GET_NUM_OF_COLS
(
pTsdbReadHandle
));
bool
ascScan
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
);
int32_t
pos
=
cur
->
pos
;
int32_t
step
=
ascScan
?
1
:
-
1
;
int32_t
start
=
cur
->
pos
;
int32_t
end
=
endPos
;
if
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
)
{
if
(
!
ascScan
)
{
TSWAP
(
start
,
end
);
}
...
...
@@ -1876,11 +1893,11 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
// the time window should always be ascending order: skey <= ekey
cur
->
win
=
(
STimeWindow
){.
skey
=
tsArray
[
start
],
.
ekey
=
tsArray
[
end
]};
cur
->
mixBlock
=
(
numOfRows
!=
pBlockInfo
->
rows
);
cur
->
lastKey
=
tsArray
[
endPos
]
+
step
;
cur
->
blockCompleted
=
true
;
cur
->
lastKey
=
tsArray
[
endPos
]
+
step
;
cur
->
blockCompleted
=
(
ascScan
?
(
endPos
==
pBlockInfo
->
rows
-
1
)
:
(
endPos
==
0
))
;
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
pos
=
endPos
+
step
;
int32_t
pos
=
endPos
+
step
;
updateInfoAfterMerge
(
pTsdbReadHandle
,
pCheckInfo
,
numOfRows
,
pos
);
doCheckGeneratedBlockRange
(
pTsdbReadHandle
);
...
...
@@ -1892,20 +1909,44 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
int32_t
getEndPosInDataBlock
(
STsdbReadHandle
*
pTsdbReadHandle
,
SDataBlockInfo
*
pBlockInfo
)
{
// NOTE: reverse the order to find the end position in data block
int32_t
endPos
=
-
1
;
int32_t
order
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
bool
ascScan
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
);
int32_t
order
=
ascScan
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
SQueryFilePos
*
cur
=
&
pTsdbReadHandle
->
cur
;
SDataCols
*
pCols
=
pTsdbReadHandle
->
rhelper
.
pDCols
[
0
];
if
(
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
&&
pTsdbReadHandle
->
window
.
ekey
>=
pBlockInfo
->
window
.
ekey
)
{
endPos
=
pBlockInfo
->
rows
-
1
;
cur
->
mixBlock
=
(
cur
->
pos
!=
0
);
}
else
if
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
&&
pTsdbReadHandle
->
window
.
ekey
<=
pBlockInfo
->
window
.
skey
)
{
endPos
=
0
;
cur
->
mixBlock
=
(
cur
->
pos
!=
pBlockInfo
->
rows
-
1
);
if
(
pTsdbReadHandle
->
outputCapacity
>=
pBlockInfo
->
rows
)
{
if
(
ascScan
&&
pTsdbReadHandle
->
window
.
ekey
>=
pBlockInfo
->
window
.
ekey
)
{
endPos
=
pBlockInfo
->
rows
-
1
;
cur
->
mixBlock
=
(
cur
->
pos
!=
0
);
}
else
if
((
!
ascScan
)
&&
pTsdbReadHandle
->
window
.
ekey
<=
pBlockInfo
->
window
.
skey
)
{
endPos
=
0
;
cur
->
mixBlock
=
(
cur
->
pos
!=
pBlockInfo
->
rows
-
1
);
}
else
{
assert
(
pCols
->
numOfRows
>
0
);
endPos
=
doBinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
pTsdbReadHandle
->
window
.
ekey
,
order
);
cur
->
mixBlock
=
true
;
}
}
else
{
assert
(
pCols
->
numOfRows
>
0
);
endPos
=
doBinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
pTsdbReadHandle
->
window
.
ekey
,
order
);
if
(
ascScan
&&
pTsdbReadHandle
->
window
.
ekey
>=
pBlockInfo
->
window
.
ekey
)
{
endPos
=
MIN
(
cur
->
pos
+
pTsdbReadHandle
->
outputCapacity
-
1
,
pBlockInfo
->
rows
-
1
);
}
else
if
((
!
ascScan
)
&&
pTsdbReadHandle
->
window
.
ekey
<=
pBlockInfo
->
window
.
skey
)
{
endPos
=
MAX
(
cur
->
pos
-
pTsdbReadHandle
->
outputCapacity
+
1
,
0
);
}
else
{
ASSERT
(
pCols
->
numOfRows
>
0
);
endPos
=
doBinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
pTsdbReadHandle
->
window
.
ekey
,
order
);
// current data is more than the capacity
int32_t
size
=
abs
(
cur
->
pos
-
endPos
)
+
1
;
if
(
size
>
pTsdbReadHandle
->
outputCapacity
)
{
int32_t
delta
=
size
-
pTsdbReadHandle
->
outputCapacity
;
if
(
ascScan
)
{
endPos
-=
delta
;
}
else
{
endPos
+=
delta
;
}
}
}
cur
->
mixBlock
=
true
;
}
...
...
@@ -2369,7 +2410,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
static
int32_t
getFirstFileDataBlock
(
STsdbReadHandle
*
pTsdbReadHandle
,
bool
*
exists
);
static
int32_t
getDataBlock
Rv
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableBlockInfo
*
pNext
,
bool
*
exists
)
{
static
int32_t
getDataBlock
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableBlockInfo
*
pNext
,
bool
*
exists
)
{
int32_t
step
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
1
:
-
1
;
SQueryFilePos
*
cur
=
&
pTsdbReadHandle
->
cur
;
...
...
@@ -2478,7 +2519,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
cur
->
fid
=
pTsdbReadHandle
->
pFileGroup
->
fid
;
STableBlockInfo
*
pBlockInfo
=
&
pTsdbReadHandle
->
pDataBlockInfo
[
cur
->
slot
];
return
getDataBlock
Rv
(
pTsdbReadHandle
,
pBlockInfo
,
exists
);
return
getDataBlock
(
pTsdbReadHandle
,
pBlockInfo
,
exists
);
}
static
bool
isEndFileDataBlock
(
SQueryFilePos
*
cur
,
int32_t
numOfBlocks
,
bool
ascTrav
)
{
...
...
@@ -2643,7 +2684,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
}
else
{
moveToNextDataBlockInCurrentFile
(
pTsdbReadHandle
);
STableBlockInfo
*
pNext
=
&
pTsdbReadHandle
->
pDataBlockInfo
[
cur
->
slot
];
return
getDataBlock
Rv
(
pTsdbReadHandle
,
pNext
,
exists
);
return
getDataBlock
(
pTsdbReadHandle
,
pNext
,
exists
);
}
}
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
cded09bf
...
...
@@ -3546,11 +3546,12 @@ _error:
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
)
{
// todo add more information about exchange operation
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
)
{
int32_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
||
type
==
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
)
{
*
order
=
TSDB_ORDER_ASC
;
*
scanFlag
=
MAIN_SCAN
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pOperator
->
operatorT
ype
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
}
else
if
(
t
ype
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
order
=
pTableScanInfo
->
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
scanFlag
;
...
...
@@ -3910,6 +3911,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
int32_t
code
=
getTableScanInfo
(
pOperator
->
pDownstream
[
0
],
&
order
,
&
scanFlag
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
...
...
@@ -4311,23 +4315,29 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
int32_t
numOfRows
=
4096
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
// Make sure the size of SSDataBlock will never exceed the size of 2MB.
int32_t
TWOMB
=
2
*
1024
*
1024
;
if
(
numOfRows
*
pResBlock
->
info
.
rowSize
>
TWOMB
)
{
numOfRows
=
TWOMB
/
pResBlock
->
info
.
rowSize
;
}
initResultSizeInfo
(
pOperator
,
numOfRows
);
initAggInfo
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
pResBlock
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
setFunctionResultOutput
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
MAIN_SCAN
,
numOfCols
,
pTaskInfo
);
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pInfo
->
binfo
.
pCtx
,
numOfCols
);
pOperator
->
name
=
"ProjectOperator"
;
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pInfo
->
binfo
.
pCtx
,
numOfCols
);
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PROJECT
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfExprs
=
num
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfExprs
=
num
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doProjectOperation
,
NULL
,
NULL
,
destroyProjectOperatorInfo
,
NULL
,
NULL
,
NULL
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
tests/script/tsim/testsuit.sim
0 → 100644
浏览文件 @
cded09bf
#run user/pass_alter.sim
#run user/basic1.sim
#run user/privilege2.sim
#run user/user_len.sim
#run user/privilege1.sim
#run user/pass_len.sim
#run tstream/basic1.sim
#run tstream/basic0.sim
#run table/basic1.sim
#run trans/create_db.sim
#run stable/alter1.sim
#run stable/vnode3.sim
#run stable/metrics.sim
#run stable/show.sim
#run stable/values.sim
#run stable/dnode3.sim
#run stable/refcount.sim
#run stable/disk.sim
#run db/basic1.sim
#run db/basic3.sim
#run db/basic7.sim
#run db/basic6.sim
#run db/create_all_options.sim
#run db/basic2.sim
#run db/error1.sim
#run db/taosdlog.sim
#run db/alter_option.sim
#run mnode/basic1.sim
#run parser/fourArithmetic-basic.sim
#run parser/groupby-basic.sim
#run snode/basic1.sim
#run query/time_process.sim
#run query/stddev.sim
#run query/interval-offset.sim
#run query/charScalarFunction.sim
#run query/complex_select.sim
#run query/explain.sim
#run query/crash_sql.sim
#run query/diff.sim
#run query/complex_limit.sim
#run query/complex_having.sim
#run query/udf.sim
#run query/complex_group.sim
#run query/interval.sim
#run query/session.sim
print ========> dead lock failed when 2 rows in outputCapacity
run query/scalarFunction.sim
run query/scalarNull.sim
run query/complex_where.sim
run tmq/basic1.sim
run tmq/basic4.sim
run tmq/basic1Of2Cons.sim
run tmq/prepareBasicEnv-1vgrp.sim
run tmq/topic.sim
run tmq/basic4Of2Cons.sim
run tmq/prepareBasicEnv-4vgrp.sim
run tmq/basic3.sim
run tmq/basic2Of2Cons.sim
run tmq/basic2.sim
run tmq/basic3Of2Cons.sim
run tmq/basic2Of2ConsOverlap.sim
run tmq/clearConsume.sim
run qnode/basic1.sim
run dnode/basic1.sim
run show/basic.sim
run insert/basic1.sim
run insert/basic0.sim
run insert/backquote.sim
run insert/null.sim
run sync/oneReplica1VgElectWithInsert.sim
run sync/threeReplica1VgElect.sim
run sync/oneReplica1VgElect.sim
run sync/insertDataByRunBack.sim
run sync/threeReplica1VgElectWihtInsert.sim
run sma/tsmaCreateInsertData.sim
run sma/rsmaCreateInsertQuery.sim
run valgrind/checkError.sim
run bnode/basic1.sim
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录