Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9a4f6abe
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9a4f6abe
编写于
6月 12, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
21c0eb0c
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
298 addition
and
64 deletion
+298
-64
source/dnode/vnode/src/tsdb/dev/tsdbCommit.c
source/dnode/vnode/src/tsdb/dev/tsdbCommit.c
+296
-61
source/dnode/vnode/src/tsdb/dev/tsdbFS.c
source/dnode/vnode/src/tsdb/dev/tsdbFS.c
+1
-1
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
+1
-2
未找到文件。
source/dnode/vnode/src/tsdb/dev/tsdbCommit.c
浏览文件 @
9a4f6abe
...
...
@@ -19,6 +19,10 @@
typedef
struct
{
STsdb
*
tsdb
;
TFileSetArray
*
fsetArr
;
TFileOpArray
fopArray
[
1
];
SSkmInfo
skmTb
[
1
];
SSkmInfo
skmRow
[
1
];
int32_t
minutes
;
int8_t
precision
;
...
...
@@ -41,13 +45,15 @@ typedef struct {
TABLEID
tbid
[
1
];
}
ctx
[
1
];
TFileOpArray
fopArray
[
1
]
;
TTsdbIterArray
iterArray
[
1
];
SIterMerger
*
iterMerger
;
SSttFileReader
*
sttReader
;
TTsdbIterArray
iterArray
[
1
];
SIterMerger
*
iterMerger
;
// writer
SSttFileWriter
*
sttWriter
;
SBlockData
blockData
[
2
];
int32_t
blockDataIdx
;
SDataFileWriter
*
dataWriter
;
SSttFileWriter
*
sttWriter
;
}
SCommitter2
;
static
int32_t
tsdbCommitOpenNewSttWriter
(
SCommitter2
*
committer
)
{
...
...
@@ -63,7 +69,7 @@ static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) {
SSttFileWriterConfig
config
[
1
]
=
{{
.
tsdb
=
committer
->
tsdb
,
.
maxRow
=
committer
->
maxRow
,
.
szPage
=
committer
->
tsdb
->
pVnode
->
config
.
tsdbPageSiz
e
,
.
szPage
=
committer
->
szPag
e
,
.
cmprAlg
=
committer
->
cmprAlg
,
.
compactVersion
=
committer
->
compactVersion
,
.
file
=
...
...
@@ -116,6 +122,16 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
int32_t
code
=
0
;
int32_t
lino
=
0
;
// if (committer->sttTrigger == 1) {
// SDataFileWriterConfig config = {
// // TODO
// };
// code = tsdbDataFileWriterOpen(&config, &committer->dataWriter);
// TSDB_CHECK_CODE(code, lino, _exit);
// // TODO
// }
// stt writer
if
(
!
committer
->
ctx
->
fset
)
{
return
tsdbCommitOpenNewSttWriter
(
committer
);
...
...
@@ -133,11 +149,10 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
return
tsdbCommitOpenExistSttWriter
(
committer
,
fobj
->
f
);
}
// data writer
if
(
0
)
{
// TODO
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
...
...
@@ -148,25 +163,21 @@ static int32_t tsdbCommitWriteDelData(SCommitter2 *committer, int64_t suid, int6
return
code
;
}
static
int32_t
tsdbCommitTSData
(
SCommitter2
*
committer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int64_t
nRow
=
0
;
int32_t
vid
=
TD_VID
(
committer
->
tsdb
->
pVnode
);
SRowInfo
*
row
;
static
int32_t
tsdbCommitTSDataOpenIterMerger
(
SCommitter2
*
committer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
committer
->
tsdb
->
imem
->
nRow
==
0
)
goto
_exit
;
ASSERT
(
TARRAY2_SIZE
(
committer
->
iterArray
)
==
0
);
ASSERT
(
committer
->
iterMerger
==
NULL
);
// open iter and iter merger
STsdbIter
*
iter
;
STsdbIterConfig
config
[
1
]
=
{{
.
type
=
TSDB_ITER_TYPE_MEMT
,
.
memt
=
committer
->
tsdb
->
imem
,
.
from
=
{{
.
ts
=
committer
->
ctx
->
minKey
,
.
version
=
VERSION_MIN
,
}},
}};
STsdbIterConfig
config
[
1
];
// memtable iter
config
->
type
=
TSDB_ITER_TYPE_MEMT
;
config
->
memt
=
committer
->
tsdb
->
imem
;
config
->
from
->
ts
=
committer
->
ctx
->
minKey
;
config
->
from
->
version
=
VERSION_MIN
;
code
=
tsdbIterOpen
(
config
,
&
iter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -174,18 +185,219 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
code
=
TARRAY2_APPEND
(
committer
->
iterArray
,
iter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// stt file iter
if
(
committer
->
sttReader
)
{
const
TSttSegReaderArray
*
readerArray
;
tsdbSttFileReaderGetSegReader
(
committer
->
sttReader
,
&
readerArray
);
SSttSegReader
*
segReader
;
TARRAY2_FOREACH
(
readerArray
,
segReader
)
{
config
->
type
=
TSDB_ITER_TYPE_STT
;
config
->
sttReader
=
segReader
;
}
code
=
tsdbIterOpen
(
config
,
&
iter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
TARRAY2_APPEND
(
committer
->
iterArray
,
iter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// open iter merger
code
=
tsdbIterMergerOpen
(
committer
->
iterArray
,
&
committer
->
iterMerger
,
false
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// loop iter
while
((
row
=
tsdbIterMergerGetData
(
committer
->
iterMerger
))
!=
NULL
)
{
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbCommitTSDataCloseIterMerger
(
SCommitter2
*
committer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
tsdbIterMergerClose
(
&
committer
->
iterMerger
);
TARRAY2_CLEAR
(
committer
->
iterArray
,
tsdbIterClose
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbCommitTSDataToDataTableBegin
(
SCommitter2
*
committer
,
const
TABLEID
*
tbid
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
committer
->
ctx
->
tbid
->
suid
=
tbid
->
suid
;
committer
->
ctx
->
tbid
->
uid
=
tbid
->
uid
;
code
=
tsdbUpdateSkmTb
(
committer
->
tsdb
,
committer
->
ctx
->
tbid
,
committer
->
skmTb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
committer
->
blockDataIdx
=
0
;
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
committer
->
blockData
);
i
++
)
{
code
=
tBlockDataInit
(
&
committer
->
blockData
[
i
],
committer
->
ctx
->
tbid
,
committer
->
skmTb
->
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbCommitTSDataToDataTableEnd
(
SCommitter2
*
committer
)
{
if
(
committer
->
ctx
->
tbid
->
uid
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
cidx
=
committer
->
blockDataIdx
;
int32_t
pidx
=
((
cidx
+
1
)
&
1
);
int32_t
numRow
=
(
committer
->
blockData
[
cidx
].
nRow
+
committer
->
blockData
[
pidx
].
nRow
)
/
2
;
if
(
committer
->
blockData
[
pidx
].
nRow
>
0
&&
numRow
>=
committer
->
minRow
)
{
ASSERT
(
committer
->
blockData
[
pidx
].
nRow
==
committer
->
maxRow
);
SRowInfo
row
[
1
]
=
{{
.
suid
=
committer
->
ctx
->
tbid
->
suid
,
.
uid
=
committer
->
ctx
->
tbid
->
uid
,
.
row
=
tsdbRowFromBlockData
(
committer
->
blockData
+
pidx
,
0
),
}};
for
(
int32_t
i
=
0
;
i
<
numRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWriteRow
(
committer
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbDataFileFlush
(
committer
->
dataWriter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
for
(
int32_t
i
=
numRow
;
i
<
committer
->
blockData
[
pidx
].
nRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWriteRow
(
committer
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
row
->
row
=
tsdbRowFromBlockData
(
committer
->
blockData
+
cidx
,
0
);
for
(
int32_t
i
=
0
;
i
<
committer
->
blockData
[
cidx
].
nRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWriteRow
(
committer
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
else
{
if
(
committer
->
blockData
[
pidx
].
nRow
>
0
)
{
code
=
tsdbDataFileWriteBlockData
(
committer
->
dataWriter
,
committer
->
blockData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
committer
->
blockData
[
cidx
].
nRow
<
committer
->
minRow
)
{
code
=
tsdbSttFileWriteBlockData
(
committer
->
sttWriter
,
committer
->
blockData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbDataFileWriteBlockData
(
committer
->
dataWriter
,
committer
->
blockData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
committer
->
blockData
);
i
++
)
{
tBlockDataReset
(
&
committer
->
blockData
[
i
]);
}
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbCommitTSDataToData
(
SCommitter2
*
committer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SMetaInfo
info
;
for
(
SRowInfo
*
row
;
(
row
=
tsdbIterMergerGetData
(
committer
->
iterMerger
))
!=
NULL
;)
{
if
(
row
->
uid
!=
committer
->
ctx
->
tbid
->
uid
)
{
// end last table write
code
=
tsdbCommitTSDataToDataTableEnd
(
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// Ignore table of obsolescence
if
(
metaGetInfo
(
committer
->
tsdb
->
pVnode
->
pMeta
,
row
->
uid
,
&
info
,
NULL
)
!=
0
)
{
code
=
tsdbIterMergerSkipTableData
(
committer
->
iterMerger
,
(
TABLEID
*
)
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
continue
;
}
code
=
tsdbCommitTSDataToDataTableBegin
(
committer
,
(
TABLEID
*
)
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
row
->
row
.
type
==
TSDBROW_ROW_FMT
)
{
code
=
tsdbUpdateSkmRow
(
committer
->
tsdb
,
committer
->
ctx
->
tbid
,
TSDBROW_SVERSION
(
&
row
->
row
),
committer
->
skmRow
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
TSDBKEY
key
=
TSDBROW_KEY
(
&
row
->
row
);
if
(
key
.
version
<=
committer
->
compactVersion
//
&&
committer
->
blockData
[
committer
->
blockDataIdx
].
nRow
>
0
//
&&
key
.
ts
==
committer
->
blockData
[
committer
->
blockDataIdx
]
.
aTSKEY
[
committer
->
blockData
[
committer
->
blockDataIdx
].
nRow
-
1
])
{
code
=
tBlockDataUpdateRow
(
committer
->
blockData
+
committer
->
blockDataIdx
,
&
row
->
row
,
committer
->
skmRow
->
pTSchema
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
if
(
committer
->
blockData
[
committer
->
blockDataIdx
].
nRow
>=
committer
->
maxRow
)
{
int32_t
idx
=
((
committer
->
blockDataIdx
+
1
)
&
1
);
if
(
committer
->
blockData
[
idx
].
nRow
>=
committer
->
maxRow
)
{
code
=
tsdbDataFileWriteBlockData
(
committer
->
dataWriter
,
committer
->
blockData
+
idx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBlockDataClear
(
committer
->
blockData
+
idx
);
}
committer
->
blockDataIdx
=
idx
;
}
code
=
tBlockDataAppendRow
(
&
committer
->
blockData
[
committer
->
blockDataIdx
],
&
row
->
row
,
committer
->
skmRow
->
pTSchema
,
row
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbIterMergerNext
(
committer
->
iterMerger
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbCommitTSDataToDataTableEnd
(
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbCommitTSDataToStt
(
SCommitter2
*
committer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
ASSERT
(
committer
->
sttReader
==
NULL
);
SMetaInfo
info
;
for
(
SRowInfo
*
row
;
(
row
=
tsdbIterMergerGetData
(
committer
->
iterMerger
))
!=
NULL
;)
{
if
(
row
->
uid
!=
committer
->
ctx
->
tbid
->
uid
)
{
committer
->
ctx
->
tbid
->
suid
=
row
->
suid
;
committer
->
ctx
->
tbid
->
uid
=
row
->
uid
;
// Ignore table of obsolescence
SMetaInfo
info
[
1
];
if
(
metaGetInfo
(
committer
->
tsdb
->
pVnode
->
pMeta
,
row
->
uid
,
info
,
NULL
)
!=
0
)
{
if
(
metaGetInfo
(
committer
->
tsdb
->
pVnode
->
pMeta
,
row
->
uid
,
&
info
,
NULL
)
!=
0
)
{
code
=
tsdbIterMergerSkipTableData
(
committer
->
iterMerger
,
committer
->
ctx
->
tbid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
continue
;
...
...
@@ -208,9 +420,37 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbCommitTSData
(
SCommitter2
*
committer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
committer
->
tsdb
->
imem
->
nRow
==
0
)
goto
_exit
;
// open iter and iter merger
code
=
tsdbCommitTSDataOpenIterMerger
(
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// loop iter
if
(
committer
->
sttTrigger
==
1
)
{
code
=
tsdbCommitTSDataToData
(
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
tsdbDebug
(
"vgId:%d %s done, fid:%d nRow:%"
PRId64
,
vid
,
__func__
,
committer
->
ctx
->
fid
,
nRow
);
code
=
tsdbCommitTSDataToStt
(
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// close iter and iter merger
code
=
tsdbCommitTSDataCloseIterMerger
(
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
...
...
@@ -254,11 +494,6 @@ static int32_t tsdbCommitDelData(SCommitter2 *committer) {
record
->
ekey
=
delData
->
eKey
;
}
if
(
!
committer
->
sttWriter
)
{
code
=
tsdbCommitOpenWriter
(
committer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbSttFileWriteTombRecord
(
committer
->
sttWriter
,
record
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -275,7 +510,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdb
*
tsdb
=
committer
->
tsdb
;
int32_t
vid
=
TD_VID
(
tsdb
->
pVnode
);
committer
->
ctx
->
fid
=
tsdbKeyFid
(
committer
->
ctx
->
nextKey
,
committer
->
minutes
,
committer
->
precision
);
committer
->
ctx
->
expLevel
=
tsdbFidLevel
(
committer
->
ctx
->
fid
,
&
tsdb
->
keepCfg
,
committer
->
ctx
->
now
);
...
...
@@ -300,10 +534,10 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
tsdb
->
pVnode
)
,
lino
,
code
);
}
else
{
tsdbDebug
(
"vgId:%d %s done, fid:%d minKey:%"
PRId64
" maxKey:%"
PRId64
" expLevel:%d"
,
vid
,
__func__
,
committer
->
ctx
->
fid
,
committer
->
ctx
->
minKey
,
committer
->
ctx
->
maxKey
,
committer
->
ctx
->
expLevel
);
tsdbDebug
(
"vgId:%d %s done, fid:%d minKey:%"
PRId64
" maxKey:%"
PRId64
" expLevel:%d"
,
TD_VID
(
tsdb
->
pVnode
)
,
__func__
,
committer
->
ctx
->
fid
,
committer
->
ctx
->
minKey
,
committer
->
ctx
->
maxKey
,
committer
->
ctx
->
expLevel
);
}
return
0
;
}
...
...
@@ -312,6 +546,11 @@ static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
committer
->
dataWriter
)
{
code
=
tsdbDataFileWriterClose
(
&
committer
->
dataWriter
,
0
,
committer
->
fopArray
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbSttFileWriterClose
(
&
committer
->
sttWriter
,
0
,
committer
->
fopArray
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -330,7 +569,6 @@ _exit:
static
int32_t
tsdbCommitFileSet
(
SCommitter2
*
committer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
committer
->
tsdb
->
pVnode
);
// fset commit start
code
=
tsdbCommitFileSetBegin
(
committer
);
...
...
@@ -349,9 +587,9 @@ static int32_t tsdbCommitFileSet(SCommitter2 *committer) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
committer
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
else
{
tsdbDebug
(
"vgId:%d %s done, fid:%d"
,
vid
,
__func__
,
committer
->
ctx
->
fid
);
tsdbDebug
(
"vgId:%d %s done, fid:%d"
,
TD_VID
(
committer
->
tsdb
->
pVnode
)
,
__func__
,
committer
->
ctx
->
fid
);
}
return
code
;
}
...
...
@@ -360,9 +598,8 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
int32_t
code
=
0
;
int32_t
lino
=
0
;
SMemTable
*
mem
=
tsdb
->
imem
;
memset
(
committer
,
0
,
sizeof
(
committer
[
0
]));
committer
->
tsdb
=
tsdb
;
code
=
tsdbFSCreateCopySnapshot
(
tsdb
->
pFS
,
&
committer
->
fsetArr
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -376,11 +613,11 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
committer
->
compactVersion
=
INT64_MAX
;
committer
->
ctx
->
cid
=
tsdbFSAllocEid
(
tsdb
->
pFS
);
committer
->
ctx
->
now
=
taosGetTimestampSec
();
TARRAY2_INIT
(
committer
->
fopArray
);
committer
->
ctx
->
nextKey
=
tsdb
->
imem
->
minKey
;
if
(
mem
->
nDel
>
0
)
{
SRBTreeIter
iter
[
1
]
=
{
tRBTreeIterCreate
(
mem
->
tbDataTree
,
1
)};
if
(
tsdb
->
imem
->
nDel
>
0
)
{
SRBTreeIter
iter
[
1
]
=
{
tRBTreeIterCreate
(
tsdb
->
imem
->
tbDataTree
,
1
)};
for
(
SRBTreeNode
*
node
=
tRBTreeIterNext
(
iter
);
node
;
node
=
tRBTreeIterNext
(
iter
))
{
STbData
*
tbData
=
TCONTAINER_OF
(
node
,
STbData
,
rbtn
);
...
...
@@ -404,7 +641,6 @@ _exit:
static
int32_t
tsdbCloseCommitter
(
SCommitter2
*
committer
,
int32_t
eno
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
committer
->
tsdb
->
pVnode
);
if
(
eno
==
0
)
{
code
=
tsdbFSEditBegin
(
committer
->
tsdb
->
pFS
,
committer
->
fopArray
,
TSDB_FEDIT_COMMIT
);
...
...
@@ -423,10 +659,10 @@ static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s, eid:%"
PRId64
,
vid
,
__func__
,
lino
,
tstrerror
(
code
)
,
committer
->
ctx
->
cid
);
tsdbError
(
"vgId:%d %s failed at line %d since %s, eid:%"
PRId64
,
TD_VID
(
committer
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
),
committer
->
ctx
->
cid
);
}
else
{
tsdbDebug
(
"vgId:%d %s done, eid:%"
PRId64
,
vid
,
__func__
,
committer
->
ctx
->
cid
);
tsdbDebug
(
"vgId:%d %s done, eid:%"
PRId64
,
TD_VID
(
committer
->
tsdb
->
pVnode
)
,
__func__
,
committer
->
ctx
->
cid
);
}
return
code
;
}
...
...
@@ -443,14 +679,14 @@ int32_t tsdbPreCommit(STsdb *tsdb) {
int32_t
tsdbCommitBegin
(
STsdb
*
tsdb
,
SCommitInfo
*
info
)
{
if
(
!
tsdb
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
tsdb
->
pVnode
);
int32_t
code
=
0
;
int32_t
lino
=
0
;
SMemTable
*
imem
=
tsdb
->
imem
;
int64_t
nRow
=
imem
->
nRow
;
int64_t
nDel
=
imem
->
nDel
;
if
(
!
nRow
&&
!
nDel
)
{
if
(
nRow
==
0
&&
nDel
==
0
)
{
taosThreadRwlockWrlock
(
&
tsdb
->
rwLock
);
tsdb
->
imem
=
NULL
;
taosThreadRwlockUnlock
(
&
tsdb
->
rwLock
);
...
...
@@ -472,9 +708,9 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
tsdb
->
pVnode
)
,
lino
,
code
);
}
else
{
tsdbInfo
(
"vgId:%d %s done, nRow:%"
PRId64
" nDel:%"
PRId64
,
vid
,
__func__
,
nRow
,
nDel
);
tsdbInfo
(
"vgId:%d %s done, nRow:%"
PRId64
" nDel:%"
PRId64
,
TD_VID
(
tsdb
->
pVnode
)
,
__func__
,
nRow
,
nDel
);
}
return
code
;
}
...
...
@@ -508,7 +744,6 @@ _exit:
int32_t
tsdbCommitAbort
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
pTsdb
->
pVnode
);
if
(
pTsdb
->
imem
==
NULL
)
goto
_exit
;
...
...
@@ -517,9 +752,9 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
vid
,
__func__
,
lino
,
tstrerror
(
code
));
tsdbError
(
"vgId:%d, %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
)
,
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbInfo
(
"vgId:%d %s done"
,
vid
,
__func__
);
tsdbInfo
(
"vgId:%d %s done"
,
TD_VID
(
pTsdb
->
pVnode
)
,
__func__
);
}
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/dev/tsdbFS.c
浏览文件 @
9a4f6abe
...
...
@@ -600,7 +600,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
}
// check if need to merge
if
(
fs
->
mergeTaskOn
==
false
)
{
if
(
fs
->
tsdb
->
pVnode
->
config
.
sttTrigger
>
1
&&
fs
->
mergeTaskOn
==
false
)
{
STFileSet
*
fset
;
TARRAY2_FOREACH_REVERSE
(
fs
->
fSetArr
,
fset
)
{
if
(
TARRAY2_SIZE
(
fset
->
lvlArr
)
==
0
)
continue
;
...
...
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
浏览文件 @
9a4f6abe
...
...
@@ -570,7 +570,6 @@ static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
static
int32_t
tsdbMergeFileSetEnd
(
SMerger
*
merger
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
code
=
tsdbMergeFileSetEndCloseWriter
(
merger
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -583,7 +582,7 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
return
code
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录