Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
49595291
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
49595291
编写于
11月 03, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(query): set the last key of each tablescan info
上级
7a14bc05
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
340 addition
and
324 deletion
+340
-324
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+25
-20
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+3
-2
tests/system-test/1-insert/delete_data.py
tests/system-test/1-insert/delete_data.py
+312
-302
未找到文件。
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
49595291
...
...
@@ -185,11 +185,11 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl
SRowMerger
*
pMerger
,
SVersionRange
*
pVerRange
);
static
int32_t
doMergeRowsInBuf
(
SIterInfo
*
pIter
,
uint64_t
uid
,
int64_t
ts
,
SArray
*
pDelList
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
);
static
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
uint64_t
uid
);
static
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
STableBlockScanInfo
*
pInfo
);
static
int32_t
doAppendRowFromFileBlock
(
SSDataBlock
*
pResBlock
,
STsdbReader
*
pReader
,
SBlockData
*
pBlockData
,
int32_t
rowIndex
);
static
void
setComposedBlockFlag
(
STsdbReader
*
pReader
,
bool
composed
);
static
bool
hasBeenDropped
(
const
SArray
*
pDelList
,
int32_t
*
index
,
TSDBKEY
*
pKey
,
int32_t
order
,
SVersionRange
*
pRange
);
static
bool
hasBeenDropped
(
const
SArray
*
pDelList
,
int32_t
*
index
,
TSDBKEY
*
pKey
,
int32_t
order
,
SVersionRange
*
p
Ver
Range
);
static
int32_t
doMergeMemTableMultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
SIterInfo
*
pIter
,
SArray
*
pDelList
,
STSRow
**
pTSRow
,
STsdbReader
*
pReader
,
bool
*
freeTSRow
);
...
...
@@ -208,7 +208,6 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static
int32_t
doBuildDataBlock
(
STsdbReader
*
pReader
);
static
TSDBKEY
getCurrentKeyInBuf
(
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
);
static
bool
hasDataInFileBlock
(
const
SBlockData
*
pBlockData
,
const
SFileBlockDumpInfo
*
pDumpInfo
);
static
bool
hasDataInLastBlock
(
SLastBlockReader
*
pLastBlockReader
);
static
bool
outOfTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
)
{
return
(
ts
>
pWindow
->
ekey
)
||
(
ts
<
pWindow
->
skey
);
}
...
...
@@ -1529,8 +1528,8 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
// opt version
// 1. it is not a border point
// 2. the direct next point is not an duplicated timestamp
if
((
pDumpInfo
->
rowIndex
<
pDumpInfo
->
totalRows
-
1
&&
pReader
->
order
==
TSDB_ORDER_ASC
)
||
(
pDumpInfo
->
rowIndex
>
0
&&
pReader
->
order
==
TSDB_ORDER_DESC
))
{
bool
asc
=
(
pReader
->
order
==
TSDB_ORDER_ASC
);
if
((
pDumpInfo
->
rowIndex
<
pDumpInfo
->
totalRows
-
1
&&
asc
)
||
(
pDumpInfo
->
rowIndex
>
0
&&
(
!
asc
)
))
{
int32_t
step
=
pReader
->
order
==
TSDB_ORDER_ASC
?
1
:
-
1
;
int64_t
nextKey
=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
+
step
];
...
...
@@ -1749,7 +1748,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -1770,6 +1769,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
// only last block exists
if
((
!
mergeBlockData
)
||
(
tsLastBlock
!=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
]))
{
if
(
tryCopyDistinctRowFromSttBlock
(
&
fRow
,
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
pReader
))
{
pBlockScanInfo
->
lastKey
=
tsLastBlock
;
return
TSDB_CODE_SUCCESS
;
}
else
{
int32_t
code
=
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
...
...
@@ -1786,7 +1786,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -1810,7 +1810,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -1858,7 +1858,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -2082,7 +2082,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -2233,6 +2233,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
STsdbReader
*
pReader
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
if
(
tryCopyDistinctRowFromFileBlock
(
pReader
,
pBlockData
,
key
,
pDumpInfo
))
{
pBlockScanInfo
->
lastKey
=
key
;
return
TSDB_CODE_SUCCESS
;
}
else
{
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
...
...
@@ -2251,7 +2252,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -2299,29 +2300,32 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SLastBlockReader
*
pLastBlockReader
=
pReader
->
status
.
fileIter
.
pLastBlockReader
;
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
int64_t
st
=
taosGetTimestampUs
();
int32_t
step
=
asc
?
1
:
-
1
;
STableBlockScanInfo
*
pBlockScanInfo
=
NULL
;
if
(
pBlockInfo
!=
NULL
)
{
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
p
BlockScanInfo
==
NULL
)
{
void
*
p
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
p
==
NULL
)
{
code
=
TSDB_CODE_INVALID_PARA
;
tsdbError
(
"failed to locate the uid:%"
PRIu64
" in query table uid list, total tables:%d, %s"
,
pBlockInfo
->
uid
,
taosHashGetSize
(
pReader
->
status
.
pTableMap
),
pReader
->
idStr
);
goto
_end
;
}
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
p
;
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
TSDBKEY
keyInBuf
=
getCurrentKeyInBuf
(
pBlockScanInfo
,
pReader
);
// it is a clean block, load it directly
if
(
isCleanFileDataBlock
(
pReader
,
pBlockInfo
,
pBlock
,
pBlockScanInfo
,
keyInBuf
,
pLastBlockReader
))
{
if
(
pReader
->
order
==
TSDB_ORDER_ASC
||
(
pReader
->
order
==
TSDB_ORDER_DESC
&&
(
!
hasDataInLastBlock
(
pLastBlockReader
))))
{
if
(
asc
||
((
!
asc
)
&&
(
!
hasDataInLastBlock
(
pLastBlockReader
))))
{
copyBlockDataToSDataBlock
(
pReader
,
pBlockScanInfo
);
// record the last key value
pBlockScanInfo
->
lastKey
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
pBlock
->
maxKey
.
ts
:
pBlock
->
minKey
.
ts
;
pBlockScanInfo
->
lastKey
=
asc
?
pBlock
->
maxKey
.
ts
:
pBlock
->
minKey
.
ts
;
goto
_end
;
}
}
...
...
@@ -2331,7 +2335,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
1
:
-
1
;
while
(
1
)
{
bool
hasBlockData
=
false
;
...
...
@@ -3220,7 +3223,6 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t
doMergeRowsInLastBlock
(
SLastBlockReader
*
pLastBlockReader
,
STableBlockScanInfo
*
pScanInfo
,
int64_t
ts
,
SRowMerger
*
pMerger
,
SVersionRange
*
pVerRange
)
{
pScanInfo
->
lastKey
=
ts
;
while
(
nextRowFromLastBlocks
(
pLastBlockReader
,
pScanInfo
,
pVerRange
))
{
int64_t
next1
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
if
(
next1
==
ts
)
{
...
...
@@ -3413,9 +3415,10 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return
TSDB_CODE_SUCCESS
;
}
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
uint64_t
uid
)
{
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
STableBlockScanInfo
*
pScanInfo
)
{
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pBlock
->
pDataBlock
);
int64_t
uid
=
pScanInfo
->
uid
;
SBlockLoadSuppInfo
*
pSupInfo
=
&
pReader
->
suppInfo
;
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
pTSRow
->
sver
,
pReader
,
uid
);
...
...
@@ -3454,6 +3457,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
}
pBlock
->
info
.
rows
+=
1
;
pScanInfo
->
lastKey
=
pTSRow
->
ts
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3517,7 +3521,8 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
break
;
}
doAppendRowFromTSRow
(
pBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
if
(
freeTSRow
)
{
taosMemoryFree
(
pTSRow
);
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
49595291
...
...
@@ -2973,7 +2973,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
destroyAggOperatorInfo
(
pInfo
);
}
...
...
@@ -3189,11 +3189,12 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
destroyFillOperatorInfo
(
pInfo
);
}
pTaskInfo
->
code
=
code
;
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
...
...
tests/system-test/1-insert/delete_data.py
浏览文件 @
49595291
...
...
@@ -24,14 +24,15 @@ from util.sqlset import TDSetSql
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
True
)
self
.
dbname
=
'db_test'
self
.
setsql
=
TDSetSql
()
self
.
stbname
=
'stb'
self
.
ntbname
=
'ntb'
self
.
rowNum
=
5
self
.
tbnum
=
2
self
.
rowNum
=
10
self
.
tbnum
=
3
self
.
ts
=
1537146000000
self
.
binary_str
=
'taosdata'
self
.
nchar_str
=
'涛思数据'
...
...
@@ -110,13 +111,22 @@ class TDTestCase:
tdSql
.
execute
(
f
'''insert into
{
tbname
}
values(
{
self
.
ts
+
i
}
,"
{
base_data
[
'binary'
]
}
")'''
)
elif
'nchar'
in
col_type
.
lower
():
tdSql
.
execute
(
f
'''insert into
{
tbname
}
values(
{
self
.
ts
+
i
}
,"
{
base_data
[
'nchar'
]
}
")'''
)
def
delete_all_data
(
self
,
tbname
,
col_type
,
row_num
,
base_data
,
dbname
,
tb_type
,
tb_num
=
1
):
def
delete_all_data
(
self
,
tbname
,
col_type
,
row_num
,
base_data
,
dbname
,
tb_type
,
tb_num
=
1
,
stbname
=
''
):
tdSql
.
query
(
f
'select count(*) from
{
tbname
}
'
)
tdSql
.
execute
(
f
'delete from
{
tbname
}
'
)
tdSql
.
execute
(
f
'flush database
{
dbname
}
'
)
tdSql
.
execute
(
'reset query cache'
)
tdSql
.
query
(
f
'select * from
{
tbname
}
'
)
tdSql
.
checkRows
(
0
)
if
tb_type
==
'ntb'
or
tb_type
==
'ctb'
:
if
tb_type
==
'ctb'
:
tdSql
.
query
(
f
'select count(*) from
{
stbname
}
'
)
if
tb_num
<=
1
:
if
len
(
tdSql
.
queryResult
)
!=
0
:
tdLog
.
exit
(
'delete case failure!'
)
else
:
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
0
][
0
],(
tb_num
-
1
)
*
row_num
)
self
.
insert_base_data
(
col_type
,
tbname
,
row_num
,
base_data
)
elif
tb_type
==
'stb'
:
for
i
in
range
(
tb_num
):
...
...
@@ -265,7 +275,7 @@ class TDTestCase:
tdSql
.
execute
(
f
'create table
{
self
.
stbname
}
_
{
i
}
using
{
self
.
stbname
}
tags(1)'
)
self
.
insert_base_data
(
col_type
,
f
'
{
self
.
stbname
}
_
{
i
}
'
,
self
.
rowNum
,
self
.
base_data
)
self
.
delete_one_row
(
f
'
{
self
.
stbname
}
_
{
i
}
'
,
col_type
,
col_name
,
self
.
base_data
,
self
.
rowNum
,
self
.
dbname
,
'ctb'
)
self
.
delete_all_data
(
f
'
{
self
.
stbname
}
_
{
i
}
'
,
col_type
,
self
.
rowNum
,
self
.
base_data
,
self
.
dbname
,
'ctb'
)
self
.
delete_all_data
(
f
'
{
self
.
stbname
}
_
{
i
}
'
,
col_type
,
self
.
rowNum
,
self
.
base_data
,
self
.
dbname
,
'ctb'
,
i
+
1
,
self
.
stbname
)
self
.
delete_error
(
f
'
{
self
.
stbname
}
_
{
i
}
'
,
col_name
,
col_type
,
self
.
base_data
)
self
.
delete_rows
(
self
.
dbname
,
f
'
{
self
.
stbname
}
_
{
i
}
'
,
col_name
,
col_type
,
self
.
base_data
,
self
.
rowNum
,
'ctb'
)
for
func
in
[
'first'
,
'last'
]:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录