Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e07afeee
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e07afeee
编写于
6月 01, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
e99b33ed
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
187 addition
and
92 deletion
+187
-92
source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h
source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h
+2
-0
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
+14
-0
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
+162
-79
source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c
source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c
+9
-13
未找到文件。
source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h
浏览文件 @
e07afeee
...
...
@@ -54,7 +54,9 @@ typedef struct SDataFileWriterConfig {
int32_t
tsdbDataFileWriterOpen
(
const
SDataFileWriterConfig
*
config
,
SDataFileWriter
**
writer
);
int32_t
tsdbDataFileWriterClose
(
SDataFileWriter
**
writer
,
bool
abort
,
STFileOp
op
[
/*TSDB_FTYPE_MAX*/
]);
int32_t
tsdbDataFileWriteTSData
(
SDataFileWriter
*
writer
,
SRowInfo
*
row
);
int32_t
tsdbDataFileWriteTSDataBlock
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
);
int32_t
tsdbDataFileFLushTSDataBlock
(
SDataFileWriter
*
writer
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c
浏览文件 @
e07afeee
...
...
@@ -440,6 +440,8 @@ _exit:
return
code
;
}
int32_t
tsdbDataFileWriteTSDataBlock
(
SDataFileWriter
*
writer
,
SBlockData
*
bData
)
{
if
(
bData
->
nRow
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
);
...
...
@@ -468,3 +470,15 @@ _exit:
}
return
code
;
}
int32_t
tsdbDataFileWriteTSData
(
SDataFileWriter
*
writer
,
SRowInfo
*
row
)
{
// TODO
ASSERT
(
0
);
return
0
;
}
int32_t
tsdbDataFileFLushTSDataBlock
(
SDataFileWriter
*
writer
)
{
// TODO
ASSERT
(
0
);
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
浏览文件 @
e07afeee
...
...
@@ -18,25 +18,29 @@
typedef
struct
{
STsdb
*
tsdb
;
TFileSetArray
*
fsetArr
;
int32_t
sttTrigger
;
int32_t
maxRow
;
int32_t
minRow
;
int32_t
szPage
;
int8_t
cmprAlg
;
int64_t
compactVersion
;
int64_t
cid
;
SSkmInfo
skmTb
[
1
];
int32_t
sttTrigger
;
int32_t
maxRow
;
int32_t
minRow
;
int32_t
szPage
;
int8_t
cmprAlg
;
int64_t
compactVersion
;
int64_t
cid
;
SSkmInfo
skmTb
[
1
];
SSkmInfo
skmRow
[
1
];
// context
struct
{
bool
opened
;
int64_t
now
;
STFileSet
*
fset
;
bool
toData
;
int32_t
level
;
SSttLvl
*
lvl
;
STFileObj
*
fobj
;
SRowInfo
*
row
;
SBlockData
bData
[
1
];
TABLEID
tbid
[
1
];
int32_t
bDataIdx
;
SBlockData
bData
[
2
];
}
ctx
[
1
];
TFileOpArray
fopArr
[
1
];
...
...
@@ -52,6 +56,7 @@ typedef struct {
}
SMerger
;
static
int32_t
tsdbMergerOpen
(
SMerger
*
merger
)
{
merger
->
ctx
->
now
=
taosGetTimestampMs
();
merger
->
maxRow
=
merger
->
tsdb
->
pVnode
->
config
.
tsdbCfg
.
maxRows
;
merger
->
minRow
=
merger
->
tsdb
->
pVnode
->
config
.
tsdbCfg
.
minRows
;
merger
->
szPage
=
merger
->
tsdb
->
pVnode
->
config
.
tsdbPageSize
;
...
...
@@ -63,52 +68,98 @@ static int32_t tsdbMergerOpen(SMerger *merger) {
}
static
int32_t
tsdbMergerClose
(
SMerger
*
merger
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SVnode
*
pVnode
=
merger
->
tsdb
->
pVnode
;
int32_t
vid
=
TD_VID
(
pVnode
);
STFileSystem
*
fs
=
merger
->
tsdb
->
pFS
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
SVnode
*
pVnode
=
merger
->
tsdb
->
pVnode
;
// edit file system
code
=
tsdbFSEditBegin
(
fs
,
merger
->
fopArr
,
TSDB_FEDIT_MERGE
);
code
=
tsdbFSEditBegin
(
merger
->
tsdb
->
pFS
,
merger
->
fopArr
,
TSDB_FEDIT_MERGE
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbFSEditCommit
(
fs
);
code
=
tsdbFSEditCommit
(
merger
->
tsdb
->
pFS
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
ASSERT
(
merger
->
dataWriter
==
NULL
);
ASSERT
(
merger
->
sttWriter
==
NULL
);
ASSERT
(
merger
->
iterMerger
==
NULL
);
ASSERT
(
TARRAY2_SIZE
(
merger
->
iterArr
)
==
0
);
ASSERT
(
TARRAY2_SIZE
(
merger
->
sttReaderArr
)
==
0
);
// clear the merge
TARRAY2_FREE
(
merger
->
iterArr
);
TARRAY2_FREE
(
merger
->
sttReaderArr
);
TARRAY2_FREE
(
merger
->
fopArr
);
tBlockDataDestroy
(
merger
->
ctx
->
bData
);
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
merger
->
ctx
->
bData
);
i
++
)
{
tBlockDataDestroy
(
merger
->
ctx
->
bData
+
i
);
}
tDestroyTSchema
(
merger
->
skmTb
->
pTSchema
);
tDestroyTSchema
(
merger
->
skmRow
->
pTSchema
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
pVnode
)
,
lino
,
code
);
}
return
0
;
return
code
;
}
static
int32_t
tsdbMergeToDataTableEnd
(
SMerger
*
merger
)
{
if
(
merger
->
ctx
->
bData
->
nRow
==
0
)
return
0
;
if
(
merger
->
ctx
->
bData
[
0
].
nRow
+
merger
->
ctx
->
bData
[
1
].
nRow
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
int32_t
cidx
=
merger
->
ctx
->
bDataIdx
;
int32_t
pidx
=
(
cidx
+
1
)
%
2
;
if
(
merger
->
ctx
->
bData
[
pidx
].
nRow
>
0
)
{
ASSERT
(
merger
->
ctx
->
bData
[
pidx
].
nRow
==
merger
->
maxRow
);
int32_t
numRow
=
(
merger
->
ctx
->
bData
[
pidx
].
nRow
+
merger
->
ctx
->
bData
[
cidx
].
nRow
)
/
2
;
SRowInfo
row
[
1
]
=
{{
.
suid
=
merger
->
ctx
->
tbid
->
suid
,
.
uid
=
merger
->
ctx
->
tbid
->
uid
,
.
row
=
tsdbRowFromBlockData
(
merger
->
ctx
->
bData
+
pidx
,
0
),
}};
for
(
int32_t
i
=
0
;
i
<
numRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWriteTSData
(
merger
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
merger
->
ctx
->
bData
->
nRow
<
merger
->
minRow
)
{
code
=
tsdbSttFileWriteTSDataBlock
(
merger
->
sttWriter
,
merger
->
ctx
->
bData
);
code
=
tsdbDataFileFLushTSDataBlock
(
merger
->
dataWriter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
for
(
int32_t
i
=
numRow
;
i
<
merger
->
ctx
->
bData
[
pidx
].
nRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWriteTSData
(
merger
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
row
->
row
=
tsdbRowFromBlockData
(
merger
->
ctx
->
bData
+
cidx
,
0
);
for
(
int32_t
i
=
0
;
i
<
merger
->
ctx
->
bData
[
cidx
].
nRow
;
i
++
)
{
row
->
row
.
iRow
=
i
;
code
=
tsdbDataFileWriteTSData
(
merger
->
dataWriter
,
row
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
else
{
code
=
tsdbDataFileWriteTSDataBlock
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
merger
->
ctx
->
bData
[
cidx
].
nRow
<
merger
->
minRow
)
{
code
=
tsdbSttFileWriteTSDataBlock
(
merger
->
sttWriter
,
merger
->
ctx
->
bData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbDataFileWriteTSDataBlock
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
cidx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
merger
->
ctx
->
bData
);
i
++
)
{
tBlockDataReset
(
merger
->
ctx
->
bData
+
i
);
}
tBlockDataClear
(
merger
->
ctx
->
bData
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
return
code
;
}
...
...
@@ -116,43 +167,62 @@ _exit:
static
int32_t
tsdbMergeToDataTableBegin
(
SMerger
*
merger
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
code
=
tsdbUpdateSkmTb
(
merger
->
tsdb
,
(
const
TABLEID
*
)
merger
->
ctx
->
row
,
merger
->
skmTb
);
code
=
tsdbUpdateSkmTb
(
merger
->
tsdb
,
merger
->
ctx
->
tbid
,
merger
->
skmTb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataInit
(
merger
->
ctx
->
bData
,
(
TABLEID
*
)
merger
->
ctx
->
row
,
merger
->
skmTb
->
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
merger
->
ctx
->
bData
);
i
++
)
{
code
=
tBlockDataInit
(
merger
->
ctx
->
bData
,
merger
->
ctx
->
tbid
,
merger
->
skmTb
->
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbMergeToData
(
SMerger
*
merger
)
{
static
int32_t
tsdbMergeToData
Level
(
SMerger
*
merger
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
while
((
merger
->
ctx
->
row
=
tsdbIterMergerGet
(
merger
->
iterMerger
))
)
{
if
(
merger
->
ctx
->
row
->
uid
!=
merger
->
ctx
->
bData
->
uid
)
{
for
(
SRowInfo
*
row
;
(
row
=
tsdbIterMergerGet
(
merger
->
iterMerger
))
!=
NULL
;
)
{
if
(
row
->
uid
!=
merger
->
ctx
->
tbid
->
uid
)
{
code
=
tsdbMergeToDataTableEnd
(
merger
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
merger
->
ctx
->
tbid
->
suid
=
row
->
suid
;
merger
->
ctx
->
tbid
->
uid
=
row
->
uid
;
code
=
tsdbMergeToDataTableBegin
(
merger
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tBlockDataAppendRow
(
merger
->
ctx
->
bData
,
&
merger
->
ctx
->
row
->
row
,
NULL
,
merger
->
ctx
->
row
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
TSDBKEY
key
[
1
]
=
{
TSDBROW_KEY
(
&
row
->
row
)};
if
(
merger
->
ctx
->
bData
->
nRow
>=
merger
->
maxRow
)
{
code
=
tsdbDataFileWriteTSDataBlock
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
);
if
(
key
->
version
<=
merger
->
compactVersion
//
&&
merger
->
ctx
->
bData
[
merger
->
ctx
->
bDataIdx
].
nRow
>
0
//
&&
merger
->
ctx
->
bData
[
merger
->
ctx
->
bDataIdx
].
aTSKEY
[
merger
->
ctx
->
bData
[
merger
->
ctx
->
bDataIdx
].
nRow
-
1
]
==
key
->
ts
)
{
// update
code
=
tBlockDataUpdateRow
(
merger
->
ctx
->
bData
+
merger
->
ctx
->
bDataIdx
,
&
row
->
row
,
NULL
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
if
(
merger
->
ctx
->
bData
[
merger
->
ctx
->
bDataIdx
].
nRow
>=
merger
->
maxRow
)
{
int32_t
idx
=
(
merger
->
ctx
->
bDataIdx
+
1
)
%
2
;
code
=
tsdbDataFileWriteTSDataBlock
(
merger
->
dataWriter
,
merger
->
ctx
->
bData
+
idx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tBlockDataClear
(
merger
->
ctx
->
bData
+
idx
);
// switch to next bData
merger
->
ctx
->
bDataIdx
=
idx
;
}
tBlockDataReset
(
merger
->
ctx
->
bData
);
code
=
tBlockDataAppendRow
(
merger
->
ctx
->
bData
+
merger
->
ctx
->
bDataIdx
,
&
row
->
row
,
NULL
,
row
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tsdbIterMergerNext
(
merger
->
iterMerger
);
...
...
@@ -164,7 +234,7 @@ static int32_t tsdbMergeToData(SMerger *merger) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
return
code
;
}
...
...
@@ -193,7 +263,6 @@ _exit:
static
int32_t
tsdbMergeFileSetBeginOpenReader
(
SMerger
*
merger
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
merger
->
ctx
->
toData
=
true
;
merger
->
ctx
->
level
=
0
;
...
...
@@ -213,7 +282,7 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
}
else
{
merger
->
ctx
->
level
++
;
// add
th
e operation
// add
remov
e operation
STFileOp
op
=
{
.
optype
=
TSDB_FOP_REMOVE
,
.
fid
=
merger
->
ctx
->
fset
->
fid
,
...
...
@@ -239,7 +308,7 @@ static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
return
code
;
}
...
...
@@ -288,12 +357,8 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
SDiskID
did
=
{
.
level
=
0
,
.
id
=
0
,
};
// TODO
if
(
merger
->
ctx
->
lvl
)
{
// to existing level
if
(
merger
->
ctx
->
lvl
)
{
// to existing level
SSttFileWriterConfig
config
[
1
]
=
{{
.
tsdb
=
merger
->
tsdb
,
.
maxRow
=
merger
->
maxRow
,
...
...
@@ -304,7 +369,15 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
}};
code
=
tsdbSttFileWriterOpen
(
config
,
&
merger
->
sttWriter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
// to new level
}
else
{
SDiskID
did
[
1
];
int32_t
level
=
tsdbFidLevel
(
merger
->
ctx
->
fset
->
fid
,
&
merger
->
tsdb
->
keepCfg
,
merger
->
ctx
->
now
);
if
(
tfsAllocDisk
(
merger
->
tsdb
->
pVnode
->
pTfs
,
level
,
did
)
<
0
)
{
code
=
TSDB_CODE_FS_NO_VALID_DISK
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// to new level
SSttFileWriterConfig
config
[
1
]
=
{{
.
tsdb
=
merger
->
tsdb
,
.
maxRow
=
merger
->
maxRow
,
...
...
@@ -314,7 +387,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
.
file
=
{
.
type
=
TSDB_FTYPE_STT
,
.
did
=
did
,
.
did
=
did
[
0
]
,
.
fid
=
merger
->
ctx
->
fset
->
fid
,
.
cid
=
merger
->
cid
,
.
size
=
0
,
...
...
@@ -328,8 +401,14 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
merger
->
ctx
->
toData
)
{
// TODO
tBlockDataReset
(
merger
->
ctx
->
bData
);
if
(
merger
->
ctx
->
toData
)
{
// TODO
SDiskID
did
[
1
];
int32_t
level
=
tsdbFidLevel
(
merger
->
ctx
->
fset
->
fid
,
&
merger
->
tsdb
->
keepCfg
,
merger
->
ctx
->
now
);
if
(
tfsAllocDisk
(
merger
->
tsdb
->
pVnode
->
pTfs
,
level
,
did
)
<
0
)
{
code
=
TSDB_CODE_FS_NO_VALID_DISK
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
SDataFileWriterConfig
config
=
{
.
tsdb
=
merger
->
tsdb
,
...
...
@@ -339,7 +418,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
[
0
]
=
{
.
type
=
TSDB_FTYPE_HEAD
,
.
did
=
did
,
.
did
=
did
[
0
]
,
.
fid
=
merger
->
ctx
->
fset
->
fid
,
.
cid
=
merger
->
cid
,
.
size
=
0
,
...
...
@@ -347,7 +426,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
[
1
]
=
{
.
type
=
TSDB_FTYPE_DATA
,
.
did
=
did
,
.
did
=
did
[
0
]
,
.
fid
=
merger
->
ctx
->
fset
->
fid
,
.
cid
=
merger
->
cid
,
.
size
=
0
,
...
...
@@ -355,7 +434,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
[
2
]
=
{
.
type
=
TSDB_FTYPE_SMA
,
.
did
=
did
,
.
did
=
did
[
0
]
,
.
fid
=
merger
->
ctx
->
fset
->
fid
,
.
cid
=
merger
->
cid
,
.
size
=
0
,
...
...
@@ -363,7 +442,7 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
[
3
]
=
{
.
type
=
TSDB_FTYPE_TOMB
,
.
did
=
did
,
.
did
=
did
[
0
]
,
.
fid
=
merger
->
ctx
->
fset
->
fid
,
.
cid
=
merger
->
cid
,
.
size
=
0
,
...
...
@@ -384,7 +463,6 @@ _exit:
static
int32_t
tsdbMergeFileSetBegin
(
SMerger
*
merger
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
ASSERT
(
TARRAY2_SIZE
(
merger
->
sttReaderArr
)
==
0
);
ASSERT
(
TARRAY2_SIZE
(
merger
->
iterArr
)
==
0
);
...
...
@@ -392,6 +470,13 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
ASSERT
(
merger
->
sttWriter
==
NULL
);
ASSERT
(
merger
->
dataWriter
==
NULL
);
merger
->
ctx
->
tbid
->
suid
=
0
;
merger
->
ctx
->
tbid
->
uid
=
0
;
merger
->
ctx
->
bDataIdx
=
0
;
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
merger
->
ctx
->
bData
);
++
i
)
{
tBlockDataReset
(
merger
->
ctx
->
bData
+
i
);
}
// open reader
code
=
tsdbMergeFileSetBeginOpenReader
(
merger
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -406,7 +491,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
return
code
;
}
...
...
@@ -416,9 +501,18 @@ static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
STFileOp
op
[
1
];
STFileOp
op
[
TSDB_FTYPE_MAX
];
code
=
tsdbSttFileWriterClose
(
&
merger
->
sttWriter
,
0
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
op
->
optype
!=
TSDB_FOP_NONE
)
{
code
=
TARRAY2_APPEND_PTR
(
merger
->
fopArr
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
merger
->
ctx
->
toData
)
{
// TODO
code
=
tsdbDataFileWriterClose
(
&
merger
->
dataWriter
,
0
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -428,14 +522,6 @@ static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
}
}
code
=
tsdbSttFileWriterClose
(
&
merger
->
sttWriter
,
0
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
op
->
optype
!=
TSDB_FOP_NONE
)
{
code
=
TARRAY2_APPEND_PTR
(
merger
->
fopArr
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
...
...
@@ -485,7 +571,7 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
// do merge
if
(
merger
->
ctx
->
toData
)
{
code
=
tsdbMergeToData
(
merger
);
code
=
tsdbMergeToData
Level
(
merger
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbMergeToUpperLevel
(
merger
);
...
...
@@ -507,16 +593,13 @@ _exit:
static
int32_t
tsdbDoMerge
(
SMerger
*
merger
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
merger
->
tsdb
->
pVnode
);
STFileSet
*
fset
;
SSttLvl
*
lvl
;
STFileObj
*
fobj
;
TARRAY2_FOREACH
(
merger
->
fsetArr
,
fset
)
{
lvl
=
TARRAY2_SIZE
(
fset
->
lvlArr
)
>
0
?
TARRAY2_FIRST
(
fset
->
lvlArr
)
:
NULL
;
SSttLvl
*
lvl
=
TARRAY2_SIZE
(
fset
->
lvlArr
)
>
0
?
TARRAY2_FIRST
(
fset
->
lvlArr
)
:
NULL
;
if
(
!
lvl
||
lvl
->
level
!=
0
||
TARRAY2_SIZE
(
lvl
->
fobjArr
)
==
0
)
continue
;
fobj
=
TARRAY2_FIRST
(
lvl
->
fobjArr
);
STFileObj
*
fobj
=
TARRAY2_FIRST
(
lvl
->
fobjArr
);
if
(
fobj
->
f
->
stt
->
nseg
<
merger
->
sttTrigger
)
continue
;
if
(
!
merger
->
ctx
->
opened
)
{
...
...
@@ -535,9 +618,9 @@ static int32_t tsdbDoMerge(SMerger *merger) {
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
vid
,
lino
,
code
);
TSDB_ERROR_LOG
(
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
lino
,
code
);
}
else
{
tsdbDebug
(
"vgId:%d %s done"
,
vid
,
__func__
);
tsdbDebug
(
"vgId:%d %s done"
,
TD_VID
(
merger
->
tsdb
->
pVnode
)
,
__func__
);
}
return
code
;
}
...
...
source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c
浏览文件 @
e07afeee
...
...
@@ -775,19 +775,15 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
}
// row to col conversion
if
(
key
->
version
<=
writer
->
config
->
compactVersion
)
{
if
(
writer
->
bData
->
nRow
>
0
//
&&
(
writer
->
bData
->
uid
//
?
writer
->
bData
->
uid
:
writer
->
bData
->
aUid
[
writer
->
bData
->
nRow
-
1
])
==
row
->
uid
//
&&
writer
->
bData
->
aTSKEY
[
writer
->
bData
->
nRow
-
1
]
==
key
->
ts
//
)
{
code
=
tBlockDataUpdateRow
(
writer
->
bData
,
&
row
->
row
,
writer
->
config
->
skmRow
->
pTSchema
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tBlockDataAppendRow
(
writer
->
bData
,
&
row
->
row
,
writer
->
config
->
skmRow
->
pTSchema
,
row
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
key
->
version
<=
writer
->
config
->
compactVersion
//
&&
writer
->
bData
->
nRow
>
0
//
&&
(
writer
->
bData
->
uid
//
?
writer
->
bData
->
uid
//
:
writer
->
bData
->
aUid
[
writer
->
bData
->
nRow
-
1
])
==
row
->
uid
//
&&
writer
->
bData
->
aTSKEY
[
writer
->
bData
->
nRow
-
1
]
==
key
->
ts
//
)
{
code
=
tBlockDataUpdateRow
(
writer
->
bData
,
&
row
->
row
,
writer
->
config
->
skmRow
->
pTSchema
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
if
(
writer
->
bData
->
nRow
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbSttFileDoWriteTSDataBlock
(
writer
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录