Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
720f3b24
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
720f3b24
编写于
9月 02, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
52c5992c
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
121 addition
and
85 deletion
+121
-85
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+20
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+100
-82
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
720f3b24
...
...
@@ -639,7 +639,7 @@ typedef struct SMergeTree {
struct
SLDataIter
*
pIter
;
}
SMergeTree
;
void
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
);
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
);
void
tMergeTreeAddIter
(
SMergeTree
*
pMTree
,
struct
SLDataIter
*
pIter
);
bool
tMergeTreeNext
(
SMergeTree
*
pMTree
);
TSDBROW
tMergeTreeGetRow
(
SMergeTree
*
pMTree
);
...
...
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
720f3b24
...
...
@@ -36,6 +36,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pRange
)
{
int32_t
code
=
0
;
*
pIter
=
taosMemoryCalloc
(
1
,
sizeof
(
SLDataIter
));
if
(
*
pIter
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
(
*
pIter
)
->
uid
=
uid
;
(
*
pIter
)
->
timeWindow
=
*
pTimeWindow
;
...
...
@@ -271,16 +275,24 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
}
}
void
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
)
{
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
)
{
pMTree
->
backward
=
backward
;
pMTree
->
pIter
=
NULL
;
pMTree
->
pIterList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pMTree
->
pIterList
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
tRBTreeCreate
(
&
pMTree
->
rbt
,
tLDataIterCmprFn
);
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
struct
SLDataIter
*
pIterList
[
TSDB_DEFAULT_LAST_FILE
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
pFReader
->
pSet
->
nLastF
;
++
i
)
{
// open all last file
/*int32_t code = */
tLDataIterOpen
(
&
pIterList
[
i
],
pFReader
,
i
,
pMTree
->
backward
,
uid
,
pTimeWindow
,
pVerRange
);
code
=
tLDataIterOpen
(
&
pIterList
[
i
],
pFReader
,
i
,
pMTree
->
backward
,
uid
,
pTimeWindow
,
pVerRange
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_end
;
}
bool
hasVal
=
tLDataIterNextRow
(
pIterList
[
i
]);
if
(
hasVal
)
{
taosArrayPush
(
pMTree
->
pIterList
,
&
pIterList
[
i
]);
...
...
@@ -289,6 +301,12 @@ void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader,
tLDataIterClose
(
pIterList
[
i
]);
}
}
return
code
;
_end:
tMergeTreeClose
(
pMTree
);
return
code
;
}
void
tMergeTreeAddIter
(
SMergeTree
*
pMTree
,
SLDataIter
*
pIter
)
{
tRBTreePut
(
&
pMTree
->
rbt
,
(
SRBTreeNode
*
)
pIter
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
720f3b24
...
...
@@ -1748,6 +1748,70 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
#endif
static
int32_t
initMemDataIterator
(
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
)
{
if
(
pBlockScanInfo
->
iterInit
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
TSDBKEY
startKey
=
{
0
};
if
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
{
startKey
=
(
TSDBKEY
){.
ts
=
pReader
->
window
.
skey
,
.
version
=
pReader
->
verRange
.
minVer
};
}
else
{
startKey
=
(
TSDBKEY
){.
ts
=
pReader
->
window
.
ekey
,
.
version
=
pReader
->
verRange
.
maxVer
};
}
int32_t
backward
=
(
!
ASCENDING_TRAVERSE
(
pReader
->
order
));
STbData
*
d
=
NULL
;
if
(
pReader
->
pReadSnap
->
pMem
!=
NULL
)
{
d
=
tsdbGetTbDataFromMemTable
(
pReader
->
pReadSnap
->
pMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
);
if
(
d
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
d
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pBlockScanInfo
->
iter
.
hasVal
=
(
tsdbTbDataIterGet
(
pBlockScanInfo
->
iter
.
iter
)
!=
NULL
);
tsdbDebug
(
"%p uid:%"
PRId64
", check data in mem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
" %s"
,
pReader
,
pBlockScanInfo
->
uid
,
startKey
.
ts
,
pReader
->
order
,
d
->
minKey
,
d
->
maxKey
,
pReader
->
idStr
);
}
else
{
tsdbError
(
"%p uid:%"
PRId64
", failed to create iterator for imem, code:%s, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
tstrerror
(
code
),
pReader
->
idStr
);
return
code
;
}
}
}
else
{
tsdbDebug
(
"%p uid:%"
PRId64
", no data in mem, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pReader
->
idStr
);
}
STbData
*
di
=
NULL
;
if
(
pReader
->
pReadSnap
->
pIMem
!=
NULL
)
{
di
=
tsdbGetTbDataFromMemTable
(
pReader
->
pReadSnap
->
pIMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
);
if
(
di
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
di
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iiter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pBlockScanInfo
->
iiter
.
hasVal
=
(
tsdbTbDataIterGet
(
pBlockScanInfo
->
iiter
.
iter
)
!=
NULL
);
tsdbDebug
(
"%p uid:%"
PRId64
", check data in imem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
" %s"
,
pReader
,
pBlockScanInfo
->
uid
,
startKey
.
ts
,
pReader
->
order
,
di
->
minKey
,
di
->
maxKey
,
pReader
->
idStr
);
}
else
{
tsdbError
(
"%p uid:%"
PRId64
", failed to create iterator for mem, code:%s, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
tstrerror
(
code
),
pReader
->
idStr
);
return
code
;
}
}
}
else
{
tsdbDebug
(
"%p uid:%"
PRId64
", no data in imem, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pReader
->
idStr
);
}
initDelSkylineIterator
(
pBlockScanInfo
,
pReader
,
d
,
di
);
pBlockScanInfo
->
iterInit
=
true
;
return
TSDB_CODE_SUCCESS
;
}
static
bool
isValidFileBlockRow
(
SBlockData
*
pBlockData
,
SFileBlockDumpInfo
*
pDumpInfo
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
)
{
// it is an multi-table data block
...
...
@@ -1779,25 +1843,20 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static
bool
outOfTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
)
{
return
(
ts
>
pWindow
->
ekey
)
||
(
ts
<
pWindow
->
skey
);
}
static
bool
initLastBlockReader
(
SLastBlockReader
*
pLastBlockReader
,
uint64_t
uid
,
SDataFReader
*
pFReader
)
{
// the last block reader has been initialized for this table.
if
(
pLastBlockReader
->
uid
==
uid
)
{
return
true
;
}
static
bool
nextRowFromLastBlocks
(
SLastBlockReader
*
pLastBlockReader
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
while
(
1
)
{
bool
hasVal
=
tMergeTreeNext
(
&
pLastBlockReader
->
mergeTree
);
if
(
!
hasVal
)
{
return
false
;
}
if
(
pLastBlockReader
->
uid
!=
0
)
{
tMergeTreeClose
(
&
pLastBlockReader
->
mergeTree
);
TSDBROW
row
=
tMergeTreeGetRow
(
&
pLastBlockReader
->
mergeTree
);
TSDBKEY
k
=
TSDBROW_KEY
(
&
row
);
if
(
!
hasBeenDropped
(
pBlockScanInfo
->
delSkyline
,
&
pBlockScanInfo
->
lastBlockDelIndex
,
&
k
,
pLastBlockReader
->
order
))
{
return
true
;
}
}
pLastBlockReader
->
uid
=
uid
;
/*int32_t code = */
tMergeTreeOpen
(
&
pLastBlockReader
->
mergeTree
,
(
pLastBlockReader
->
order
==
TSDB_ORDER_DESC
),
pFReader
,
uid
,
&
pLastBlockReader
->
window
,
&
pLastBlockReader
->
verRange
);
return
tMergeTreeNext
(
&
pLastBlockReader
->
mergeTree
);
}
static
bool
nextRowInLastBlock
(
SLastBlockReader
*
pLastBlockReader
,
STableBlockScanInfo
*
pBlockScanInfo
)
{
return
tMergeTreeNext
(
&
pLastBlockReader
->
mergeTree
);
#if 0
*(pLastBlockReader->rowIndex) += step;
...
...
@@ -1854,6 +1913,29 @@ static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
#endif
}
static
bool
initLastBlockReader
(
SLastBlockReader
*
pLastBlockReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
)
{
// the last block reader has been initialized for this table.
if
(
pLastBlockReader
->
uid
==
pBlockScanInfo
->
uid
)
{
return
true
;
}
if
(
pLastBlockReader
->
uid
!=
0
)
{
tMergeTreeClose
(
&
pLastBlockReader
->
mergeTree
);
}
initMemDataIterator
(
pBlockScanInfo
,
pReader
);
pLastBlockReader
->
uid
=
pBlockScanInfo
->
uid
;
int32_t
code
=
tMergeTreeOpen
(
&
pLastBlockReader
->
mergeTree
,
(
pLastBlockReader
->
order
==
TSDB_ORDER_DESC
),
pReader
->
pFileReader
,
pBlockScanInfo
->
uid
,
&
pLastBlockReader
->
window
,
&
pLastBlockReader
->
verRange
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
false
;
}
return
nextRowFromLastBlocks
(
pLastBlockReader
,
pBlockScanInfo
);
}
static
int64_t
getCurrentKeyInLastBlock
(
SLastBlockReader
*
pLastBlockReader
)
{
TSDBROW
row
=
tMergeTreeGetRow
(
&
pLastBlockReader
->
mergeTree
);
TSDBKEY
key
=
TSDBROW_KEY
(
&
row
);
...
...
@@ -1998,70 +2080,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
void
setComposedBlockFlag
(
STsdbReader
*
pReader
,
bool
composed
)
{
pReader
->
status
.
composedDataBlock
=
composed
;
}
static
int32_t
initMemDataIterator
(
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
)
{
if
(
pBlockScanInfo
->
iterInit
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
TSDBKEY
startKey
=
{
0
};
if
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
{
startKey
=
(
TSDBKEY
){.
ts
=
pReader
->
window
.
skey
,
.
version
=
pReader
->
verRange
.
minVer
};
}
else
{
startKey
=
(
TSDBKEY
){.
ts
=
pReader
->
window
.
ekey
,
.
version
=
pReader
->
verRange
.
maxVer
};
}
int32_t
backward
=
(
!
ASCENDING_TRAVERSE
(
pReader
->
order
));
STbData
*
d
=
NULL
;
if
(
pReader
->
pReadSnap
->
pMem
!=
NULL
)
{
d
=
tsdbGetTbDataFromMemTable
(
pReader
->
pReadSnap
->
pMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
);
if
(
d
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
d
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pBlockScanInfo
->
iter
.
hasVal
=
(
tsdbTbDataIterGet
(
pBlockScanInfo
->
iter
.
iter
)
!=
NULL
);
tsdbDebug
(
"%p uid:%"
PRId64
", check data in mem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
" %s"
,
pReader
,
pBlockScanInfo
->
uid
,
startKey
.
ts
,
pReader
->
order
,
d
->
minKey
,
d
->
maxKey
,
pReader
->
idStr
);
}
else
{
tsdbError
(
"%p uid:%"
PRId64
", failed to create iterator for imem, code:%s, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
tstrerror
(
code
),
pReader
->
idStr
);
return
code
;
}
}
}
else
{
tsdbDebug
(
"%p uid:%"
PRId64
", no data in mem, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pReader
->
idStr
);
}
STbData
*
di
=
NULL
;
if
(
pReader
->
pReadSnap
->
pIMem
!=
NULL
)
{
di
=
tsdbGetTbDataFromMemTable
(
pReader
->
pReadSnap
->
pIMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
);
if
(
di
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
di
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iiter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pBlockScanInfo
->
iiter
.
hasVal
=
(
tsdbTbDataIterGet
(
pBlockScanInfo
->
iiter
.
iter
)
!=
NULL
);
tsdbDebug
(
"%p uid:%"
PRId64
", check data in imem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
" %s"
,
pReader
,
pBlockScanInfo
->
uid
,
startKey
.
ts
,
pReader
->
order
,
di
->
minKey
,
di
->
maxKey
,
pReader
->
idStr
);
}
else
{
tsdbError
(
"%p uid:%"
PRId64
", failed to create iterator for mem, code:%s, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
tstrerror
(
code
),
pReader
->
idStr
);
return
code
;
}
}
}
else
{
tsdbDebug
(
"%p uid:%"
PRId64
", no data in imem, %s"
,
pReader
,
pBlockScanInfo
->
uid
,
pReader
->
idStr
);
}
initDelSkylineIterator
(
pBlockScanInfo
,
pReader
,
d
,
di
);
pBlockScanInfo
->
iterInit
=
true
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
initDelSkylineIterator
(
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
,
STbData
*
pMemTbData
,
STbData
*
piMemTbData
)
{
if
(
pBlockScanInfo
->
delSkyline
!=
NULL
)
{
...
...
@@ -2296,7 +2314,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while
(
1
)
{
// load the last data block of current table
STableBlockScanInfo
*
pScanInfo
=
pStatus
->
pTableIter
;
bool
hasVal
=
initLastBlockReader
(
pLastBlockReader
,
pScanInfo
->
uid
,
pReader
->
pFile
Reader
);
bool
hasVal
=
initLastBlockReader
(
pLastBlockReader
,
pScanInfo
,
p
Reader
);
if
(
!
hasVal
)
{
bool
hasNexTable
=
moveToNextTable
(
pOrderedCheckInfo
,
pStatus
);
if
(
!
hasNexTable
)
{
...
...
@@ -2879,7 +2897,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t
doMergeRowsInLastBlock
(
SLastBlockReader
*
pLastBlockReader
,
STableBlockScanInfo
*
pScanInfo
,
int64_t
ts
,
SRowMerger
*
pMerger
)
{
while
(
nextRow
InLastBlock
(
pLastBlockReader
,
pScanInfo
))
{
while
(
nextRow
FromLastBlocks
(
pLastBlockReader
,
pScanInfo
))
{
int64_t
next1
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
if
(
next1
==
ts
)
{
TSDBROW
fRow1
=
tMergeTreeGetRow
(
&
pLastBlockReader
->
mergeTree
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录