Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2cf70ab0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2cf70ab0
编写于
5月 27, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
880359dd
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
93 addition
and
92 deletion
+93
-92
source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
+2
-2
source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
+91
-90
未找到文件。
source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
浏览文件 @
2cf70ab0
...
...
@@ -51,8 +51,8 @@ int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *stati
struct
SSttFileReaderConfig
{
STsdb
*
tsdb
;
SSkmInfo
*
pS
kmTb
;
SSkmInfo
*
pS
kmRow
;
SSkmInfo
*
s
kmTb
;
SSkmInfo
*
s
kmRow
;
uint8_t
**
aBuf
;
int32_t
szPage
;
STFile
file
;
...
...
source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
浏览文件 @
2cf70ab0
...
...
@@ -17,13 +17,15 @@
typedef
struct
{
int64_t
prevFooter
;
SFDataPtr
dict
[
4
];
// 0:bloom filter, 1:SSttBlk, 2:STbStatisBlk, 3:SDelBlk
uint8_t
reserved
[
24
];
}
SFSttFooter
;
SFDataPtr
sttBlkPtr
[
1
];
SFDataPtr
delBlkPtr
[
1
];
SFDataPtr
statisBlkPtr
[
1
];
SFDataPtr
rsrvd
[
2
];
}
SSttFooter
;
// SSttFReader ============================================================
struct
SSttFileReader
{
SSttFileReaderConfig
config
;
SSttFileReaderConfig
config
[
1
]
;
TSttSegReaderArray
segReaderArray
;
STsdbFD
*
fd
;
};
...
...
@@ -38,7 +40,7 @@ struct SSttSegReader {
bool
statisBlkLoaded
;
}
ctx
;
S
FSttFooter
footer
;
S
SttFooter
footer
[
1
]
;
void
*
bloomFilter
;
TSttBlkArray
sttBlkArray
;
TDelBlkArray
delBlkArray
;
...
...
@@ -49,7 +51,7 @@ struct SSttSegReader {
static
int32_t
tsdbSttSegReaderOpen
(
SSttFileReader
*
reader
,
int64_t
offset
,
SSttSegReader
**
segReader
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
reader
->
config
.
tsdb
->
pVnode
);
int32_t
vid
=
TD_VID
(
reader
->
config
->
tsdb
->
pVnode
);
ASSERT
(
offset
>=
TSDB_FHDR_SIZE
);
...
...
@@ -97,7 +99,7 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **
reader
[
0
]
=
taosMemoryCalloc
(
1
,
sizeof
(
*
reader
[
0
]));
if
(
reader
[
0
]
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
reader
[
0
]
->
config
=
config
[
0
];
reader
[
0
]
->
config
[
0
]
=
config
[
0
];
TARRAY2_INIT
(
&
reader
[
0
]
->
segReaderArray
);
// open file
...
...
@@ -111,13 +113,13 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **
while
(
size
>
0
)
{
SSttSegReader
*
segReader
;
code
=
tsdbSttSegReaderOpen
(
reader
[
0
],
size
-
sizeof
(
S
F
SttFooter
),
&
segReader
);
code
=
tsdbSttSegReaderOpen
(
reader
[
0
],
size
-
sizeof
(
SSttFooter
),
&
segReader
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
TARRAY2_APPEND
(
&
reader
[
0
]
->
segReaderArray
,
segReader
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
size
=
segReader
->
footer
.
prevFooter
;
size
=
segReader
->
footer
->
prevFooter
;
}
ASSERT
(
TARRAY2_SIZE
(
&
reader
[
0
]
->
segReaderArray
)
==
config
->
file
.
stt
.
nseg
);
...
...
@@ -152,15 +154,15 @@ int32_t tsdbSttFReadBloomFilter(SSttSegReader *reader, const void *pFilter) {
int32_t
tsdbSttFReadStatisBlk
(
SSttSegReader
*
reader
,
const
TStatisBlkArray
**
statisBlkArray
)
{
if
(
!
reader
->
ctx
.
statisBlkLoaded
)
{
SFDataPtr
fptr
=
reader
->
footer
.
dict
[
2
];
if
(
fptr
.
size
>
0
)
{
ASSERT
(
fptr
.
size
%
sizeof
(
STbStatisBlk
)
==
0
);
if
(
reader
->
footer
->
statisBlkPtr
->
size
>
0
)
{
ASSERT
(
reader
->
footer
->
statisBlkPtr
->
size
%
sizeof
(
STbStatisBlk
)
==
0
);
int32_t
size
=
fptr
.
size
/
sizeof
(
STbStatisBlk
);
void
*
data
=
taosMemoryMalloc
(
fptr
.
size
);
int32_t
size
=
reader
->
footer
->
statisBlkPtr
->
size
/
sizeof
(
STbStatisBlk
);
void
*
data
=
taosMemoryMalloc
(
reader
->
footer
->
statisBlkPtr
->
size
);
if
(
!
data
)
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
code
=
tsdbReadFile
(
reader
->
reader
->
fd
,
fptr
.
offset
,
data
,
fptr
.
size
);
int32_t
code
=
tsdbReadFile
(
reader
->
reader
->
fd
,
reader
->
footer
->
statisBlkPtr
->
offset
,
data
,
reader
->
footer
->
statisBlkPtr
->
size
);
if
(
code
)
return
code
;
TARRAY2_INIT_EX
(
&
reader
->
statisBlkArray
,
size
,
size
,
data
);
...
...
@@ -177,15 +179,15 @@ int32_t tsdbSttFReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **sta
int32_t
tsdbSttFReadDelBlk
(
SSttSegReader
*
reader
,
const
TDelBlkArray
**
delBlkArray
)
{
if
(
!
reader
->
ctx
.
delBlkLoaded
)
{
SFDataPtr
fptr
=
reader
->
footer
.
dict
[
3
];
if
(
fptr
.
size
>
0
)
{
ASSERT
(
fptr
.
size
%
sizeof
(
SDelBlk
)
==
0
);
if
(
reader
->
footer
->
delBlkPtr
->
size
>
0
)
{
ASSERT
(
reader
->
footer
->
delBlkPtr
->
size
%
sizeof
(
SDelBlk
)
==
0
);
int32_t
size
=
fptr
.
size
/
sizeof
(
SDelBlk
);
void
*
data
=
taosMemoryMalloc
(
fptr
.
size
);
int32_t
size
=
reader
->
footer
->
delBlkPtr
->
size
/
sizeof
(
SDelBlk
);
void
*
data
=
taosMemoryMalloc
(
reader
->
footer
->
delBlkPtr
->
size
);
if
(
!
data
)
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
code
=
tsdbReadFile
(
reader
->
reader
->
fd
,
fptr
.
offset
,
data
,
fptr
.
size
);
int32_t
code
=
tsdbReadFile
(
reader
->
reader
->
fd
,
reader
->
footer
->
delBlkPtr
->
offset
,
data
,
reader
->
footer
->
delBlkPtr
->
size
);
if
(
code
)
return
code
;
TARRAY2_INIT_EX
(
&
reader
->
delBlkArray
,
size
,
size
,
data
);
...
...
@@ -202,15 +204,15 @@ int32_t tsdbSttFReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArr
int32_t
tsdbSttFReadSttBlk
(
SSttSegReader
*
reader
,
const
TSttBlkArray
**
sttBlkArray
)
{
if
(
!
reader
->
ctx
.
sttBlkLoaded
)
{
SFDataPtr
fptr
=
reader
->
footer
.
dict
[
1
];
if
(
fptr
.
size
>
0
)
{
ASSERT
(
fptr
.
size
%
sizeof
(
SSttBlk
)
==
0
);
if
(
reader
->
footer
->
sttBlkPtr
->
size
>
0
)
{
ASSERT
(
reader
->
footer
->
sttBlkPtr
->
size
%
sizeof
(
SSttBlk
)
==
0
);
int32_t
size
=
fptr
.
size
/
sizeof
(
SSttBlk
);
void
*
data
=
taosMemoryMalloc
(
fptr
.
size
);
int32_t
size
=
reader
->
footer
->
sttBlkPtr
->
size
/
sizeof
(
SSttBlk
);
void
*
data
=
taosMemoryMalloc
(
reader
->
footer
->
sttBlkPtr
->
size
);
if
(
!
data
)
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
code
=
tsdbReadFile
(
reader
->
reader
->
fd
,
fptr
.
offset
,
data
,
fptr
.
size
);
int32_t
code
=
tsdbReadFile
(
reader
->
reader
->
fd
,
reader
->
footer
->
sttBlkPtr
->
offset
,
data
,
reader
->
footer
->
sttBlkPtr
->
size
);
if
(
code
)
return
code
;
TARRAY2_INIT_EX
(
&
reader
->
sttBlkArray
,
size
,
size
,
data
);
...
...
@@ -245,18 +247,17 @@ int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *stati
// SSttFWriter ============================================================
struct
SSttFileWriter
{
SSttFileWriterConfig
config
;
SSttFileWriterConfig
config
[
1
]
;
struct
{
bool
opened
;
}
ctx
;
// file
STFile
file
;
// data
TSttBlkArray
sttBlkArray
;
TDelBlkArray
delBlkArray
;
TStatisBlkArray
statisBlkArray
;
void
*
bloomFilter
;
// TODO
SFSttFooter
footer
;
TSttBlkArray
sttBlkArray
[
1
];
TDelBlkArray
delBlkArray
[
1
];
TStatisBlkArray
statisBlkArray
[
1
];
SSttFooter
footer
[
1
];
SBlockData
bData
[
1
];
SDelBlock
dData
[
1
];
STbStatisBlock
sData
[
1
];
...
...
@@ -288,7 +289,7 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
if
(
sttBlk
->
maxVer
<
writer
->
bData
->
aVersion
[
iRow
])
sttBlk
->
maxVer
=
writer
->
bData
->
aVersion
[
iRow
];
}
code
=
tCmprBlockData
(
writer
->
bData
,
writer
->
config
.
cmprAlg
,
NULL
,
NULL
,
writer
->
config
.
aBuf
,
writer
->
aBufSize
);
code
=
tCmprBlockData
(
writer
->
bData
,
writer
->
config
->
cmprAlg
,
NULL
,
NULL
,
writer
->
config
->
aBuf
,
writer
->
aBufSize
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
sttBlk
->
bInfo
.
offset
=
writer
->
file
.
size
;
...
...
@@ -297,19 +298,19 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
for
(
int32_t
i
=
3
;
i
>=
0
;
i
--
)
{
if
(
writer
->
aBufSize
[
i
])
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
writer
->
config
.
aBuf
[
i
],
writer
->
aBufSize
[
i
]);
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
writer
->
config
->
aBuf
[
i
],
writer
->
aBufSize
[
i
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
writer
->
aBufSize
[
i
];
}
}
tBlockDataClear
(
writer
->
bData
);
code
=
TARRAY2_APPEND_P
(
&
writer
->
sttBlkArray
,
sttBlk
);
code
=
TARRAY2_APPEND_P
(
writer
->
sttBlkArray
,
sttBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
...
...
@@ -348,12 +349,12 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
}
tTbStatisBlockClear
(
writer
->
sData
);
code
=
TARRAY2_APPEND_P
(
&
writer
->
statisBlkArray
,
statisBlk
);
code
=
TARRAY2_APPEND_P
(
writer
->
statisBlkArray
,
statisBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
...
...
@@ -391,12 +392,12 @@ static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
}
tDelBlockDestroy
(
writer
->
dData
);
code
=
TARRAY2_APPEND_P
(
&
writer
->
delBlkArray
,
delBlk
);
code
=
TARRAY2_APPEND_P
(
writer
->
delBlkArray
,
delBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
// tsdbTrace();
...
...
@@ -408,20 +409,20 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
int32_t
code
=
0
;
int32_t
lino
;
writer
->
footer
.
dict
[
1
].
offset
=
writer
->
file
.
size
;
writer
->
footer
.
dict
[
1
].
size
=
sizeof
(
SSttBlk
)
*
TARRAY2_SIZE
(
&
writer
->
sttBlkArray
);
writer
->
footer
->
sttBlkPtr
->
offset
=
writer
->
file
.
size
;
writer
->
footer
->
sttBlkPtr
->
size
=
sizeof
(
SSttBlk
)
*
TARRAY2_SIZE
(
writer
->
sttBlkArray
);
if
(
writer
->
footer
.
dict
[
1
].
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
sttBlkArray
),
writer
->
footer
.
dict
[
1
].
size
);
if
(
writer
->
footer
->
sttBlkPtr
->
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
writer
->
sttBlkArray
),
writer
->
footer
->
sttBlkPtr
->
size
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
writer
->
footer
.
dict
[
1
].
size
;
writer
->
file
.
size
+=
writer
->
footer
->
sttBlkPtr
->
size
;
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
...
...
@@ -431,19 +432,19 @@ static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
int32_t
code
=
0
;
int32_t
lino
;
writer
->
footer
.
dict
[
2
].
offset
=
writer
->
file
.
size
;
writer
->
footer
.
dict
[
2
].
size
=
sizeof
(
STbStatisBlock
)
*
TARRAY2_SIZE
(
&
writer
->
statisBlkArray
);
writer
->
footer
->
statisBlkPtr
->
offset
=
writer
->
file
.
size
;
writer
->
footer
->
statisBlkPtr
->
size
=
sizeof
(
STbStatisBlock
)
*
TARRAY2_SIZE
(
writer
->
statisBlkArray
);
if
(
writer
->
footer
.
dict
[
2
].
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
statisBlkArray
),
writer
->
footer
.
dict
[
2
].
size
);
if
(
writer
->
footer
->
statisBlkPtr
->
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
writer
->
statisBlkArray
),
writer
->
footer
->
statisBlkPtr
->
size
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
writer
->
footer
.
dict
[
2
].
size
;
writer
->
file
.
size
+=
writer
->
footer
->
statisBlkPtr
->
size
;
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
...
...
@@ -453,19 +454,19 @@ static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) {
int32_t
code
=
0
;
int32_t
lino
;
writer
->
footer
.
dict
[
3
].
offset
=
writer
->
file
.
size
;
writer
->
footer
.
dict
[
3
].
size
=
sizeof
(
SDelBlk
)
*
TARRAY2_SIZE
(
&
writer
->
delBlkArray
);
writer
->
footer
->
delBlkPtr
->
offset
=
writer
->
file
.
size
;
writer
->
footer
->
delBlkPtr
->
size
=
sizeof
(
SDelBlk
)
*
TARRAY2_SIZE
(
writer
->
delBlkArray
);
if
(
writer
->
footer
.
dict
[
3
].
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
delBlkArray
),
writer
->
footer
.
dict
[
3
].
size
);
if
(
writer
->
footer
->
delBlkPtr
->
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
writer
->
delBlkArray
),
writer
->
footer
->
delBlkPtr
->
size
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
writer
->
footer
.
dict
[
3
].
size
;
writer
->
file
.
size
+=
writer
->
footer
->
delBlkPtr
->
size
;
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
...
...
@@ -480,14 +481,14 @@ static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
static
int32_t
tsdbSttFWriterDoOpen
(
SSttFileWriter
*
writer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
);
int32_t
vid
=
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
);
// set
writer
->
file
=
writer
->
config
.
file
;
writer
->
file
=
writer
->
config
->
file
;
writer
->
file
.
stt
.
nseg
++
;
if
(
!
writer
->
config
.
skmTb
)
writer
->
config
.
skmTb
=
&
writer
->
skmTb
;
if
(
!
writer
->
config
.
skmRow
)
writer
->
config
.
skmRow
=
&
writer
->
skmRow
;
if
(
!
writer
->
config
.
aBuf
)
writer
->
config
.
aBuf
=
writer
->
aBuf
;
if
(
!
writer
->
config
->
skmTb
)
writer
->
config
->
skmTb
=
&
writer
->
skmTb
;
if
(
!
writer
->
config
->
skmRow
)
writer
->
config
->
skmRow
=
&
writer
->
skmRow
;
if
(
!
writer
->
config
->
aBuf
)
writer
->
config
->
aBuf
=
writer
->
aBuf
;
// open file
int32_t
flag
;
...
...
@@ -499,8 +500,8 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
flag
=
TD_FILE_READ
|
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
tsdbTFileName
(
writer
->
config
.
tsdb
,
&
writer
->
file
,
fname
);
code
=
tsdbOpenFile
(
fname
,
writer
->
config
.
szPage
,
flag
,
&
writer
->
fd
);
tsdbTFileName
(
writer
->
config
->
tsdb
,
&
writer
->
file
,
fname
);
code
=
tsdbOpenFile
(
fname
,
writer
->
config
->
szPage
,
flag
,
&
writer
->
fd
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
!
writer
->
file
.
size
)
{
...
...
@@ -537,7 +538,7 @@ static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) {
static
int32_t
tsdbSttFWriterCloseCommit
(
SSttFileWriter
*
writer
,
STFileOp
*
op
)
{
int32_t
lino
;
int32_t
code
;
int32_t
vid
=
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
);
int32_t
vid
=
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
);
code
=
tsdbSttFileDoWriteTSDataBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -571,10 +572,10 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) {
tsdbCloseFile
(
&
writer
->
fd
);
ASSERT
(
writer
->
config
.
file
.
size
>
writer
->
file
.
size
);
op
->
optype
=
writer
->
config
.
file
.
size
?
TSDB_FOP_MODIFY
:
TSDB_FOP_CREATE
;
op
->
fid
=
writer
->
config
.
file
.
fid
;
op
->
of
=
writer
->
config
.
file
;
ASSERT
(
writer
->
config
->
file
.
size
>
writer
->
file
.
size
);
op
->
optype
=
writer
->
config
->
file
.
size
?
TSDB_FOP_MODIFY
:
TSDB_FOP_CREATE
;
op
->
fid
=
writer
->
config
->
file
.
fid
;
op
->
of
=
writer
->
config
->
file
;
op
->
nf
=
writer
->
file
;
_exit:
...
...
@@ -587,11 +588,11 @@ _exit:
static
int32_t
tsdbSttFWriterCloseAbort
(
SSttFileWriter
*
writer
)
{
char
fname
[
TSDB_FILENAME_LEN
];
tsdbTFileName
(
writer
->
config
.
tsdb
,
&
writer
->
config
.
file
,
fname
);
if
(
writer
->
config
.
file
.
size
)
{
// truncate the file to the original size
ASSERT
(
writer
->
config
.
file
.
size
<=
writer
->
file
.
size
);
if
(
writer
->
config
.
file
.
size
<
writer
->
file
.
size
)
{
taosFtruncateFile
(
writer
->
fd
->
pFD
,
writer
->
config
.
file
.
size
);
tsdbTFileName
(
writer
->
config
->
tsdb
,
&
writer
->
config
->
file
,
fname
);
if
(
writer
->
config
->
file
.
size
)
{
// truncate the file to the original size
ASSERT
(
writer
->
config
->
file
.
size
<=
writer
->
file
.
size
);
if
(
writer
->
config
->
file
.
size
<
writer
->
file
.
size
)
{
taosFtruncateFile
(
writer
->
fd
->
pFD
,
writer
->
config
->
file
.
size
);
tsdbCloseFile
(
&
writer
->
fd
);
}
}
else
{
// remove the file
...
...
@@ -606,7 +607,7 @@ int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **
writer
[
0
]
=
taosMemoryMalloc
(
sizeof
(
*
writer
[
0
]));
if
(
writer
[
0
]
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
writer
[
0
]
->
config
=
config
[
0
];
writer
[
0
]
->
config
[
0
]
=
config
[
0
];
writer
[
0
]
->
ctx
.
opened
=
false
;
return
0
;
}
...
...
@@ -614,7 +615,7 @@ int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **
int32_t
tsdbSttFWriterClose
(
SSttFileWriter
**
writer
,
int8_t
abort
,
STFileOp
*
op
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
writer
[
0
]
->
config
.
tsdb
->
pVnode
);
int32_t
vid
=
TD_VID
(
writer
[
0
]
->
config
->
tsdb
->
pVnode
);
if
(
!
writer
[
0
]
->
ctx
.
opened
)
{
op
->
optype
=
TSDB_FOP_NONE
;
...
...
@@ -656,7 +657,7 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
writer
->
sData
[
0
].
nRow
>=
writer
->
config
.
maxRow
)
{
if
(
writer
->
sData
[
0
].
nRow
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbSttFileDoWriteStatisBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -670,26 +671,26 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
writer
->
sData
[
0
].
aData
[
6
][
writer
->
sData
[
0
].
nRow
]
=
1
;
// count
writer
->
sData
[
0
].
nRow
++
;
code
=
tsdbUpdateSkmTb
(
writer
->
config
.
tsdb
,
tbid
,
writer
->
config
.
skmTb
);
code
=
tsdbUpdateSkmTb
(
writer
->
config
->
tsdb
,
tbid
,
writer
->
config
->
skmTb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
TABLEID
id
=
{
.
suid
=
tbid
->
suid
,
.
uid
=
tbid
->
uid
?
0
:
tbid
->
uid
,
};
code
=
tBlockDataInit
(
&
writer
->
bData
[
0
],
&
id
,
writer
->
config
.
skmTb
->
pTSchema
,
NULL
,
0
);
code
=
tBlockDataInit
(
&
writer
->
bData
[
0
],
&
id
,
writer
->
config
->
skmTb
->
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
row
->
row
.
type
==
TSDBROW_ROW_FMT
)
{
code
=
tsdbUpdateSkmRow
(
writer
->
config
.
tsdb
,
tbid
,
TSDBROW_SVERSION
(
pRow
),
writer
->
config
.
skmRow
);
code
=
tsdbUpdateSkmRow
(
writer
->
config
->
tsdb
,
tbid
,
TSDBROW_SVERSION
(
pRow
),
writer
->
config
->
skmRow
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tBlockDataAppendRow
(
&
writer
->
bData
[
0
],
pRow
,
writer
->
config
.
skmRow
->
pTSchema
,
tbid
->
uid
);
code
=
tBlockDataAppendRow
(
&
writer
->
bData
[
0
],
pRow
,
writer
->
config
->
skmRow
->
pTSchema
,
tbid
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
writer
->
bData
[
0
].
nRow
>=
writer
->
config
.
maxRow
)
{
if
(
writer
->
bData
[
0
].
nRow
>=
writer
->
config
->
maxRow
)
{
code
=
tsdbSttFileDoWriteTSDataBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
...
...
@@ -707,7 +708,7 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
...
...
@@ -729,7 +730,7 @@ int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) {
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
->
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
0
;
...
...
@@ -751,7 +752,7 @@ int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDe
writer
->
dData
[
0
].
aData
[
4
][
writer
->
dData
[
0
].
nRow
]
=
pDelData
->
eKey
;
// ekey
writer
->
dData
[
0
].
nRow
++
;
if
(
writer
->
dData
[
0
].
nRow
>=
writer
->
config
.
maxRow
)
{
if
(
writer
->
dData
[
0
].
nRow
>=
writer
->
config
->
maxRow
)
{
return
tsdbSttFileDoWriteDelBlock
(
writer
);
}
else
{
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录