Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4cb4c638
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4cb4c638
编写于
5月 26, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
f5286359
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
313 addition
and
424 deletion
+313
-424
include/util/tarray2.h
include/util/tarray2.h
+2
-0
source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h
source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h
+1
-1
source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
+11
-8
source/dnode/vnode/src/tsdb/dev/tsdbCommit.c
source/dnode/vnode/src/tsdb/dev/tsdbCommit.c
+6
-6
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
+6
-6
source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
+287
-403
未找到文件。
include/util/tarray2.h
浏览文件 @
4cb4c638
...
...
@@ -43,6 +43,8 @@ typedef void (*TArray2Cb)(void *);
#define TARRAY2_INITIALIZER \
{ 0, 0, NULL }
#define TARRAY2_SIZE(a) ((a)->size)
#define TARRAY2_CAPACITY(a) ((a)->capacity)
#define TARRAY2_DATA(a) ((a)->data)
#define TARRAY2_GET(a, i) ((a)->data[i])
#define TARRAY2_GET_PTR(a, i) (&((a)->data[i]))
#define TARRAY2_FIRST(a) ((a)->data[0])
...
...
source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h
浏览文件 @
4cb4c638
...
...
@@ -57,8 +57,8 @@ SSttLvl *tsdbTFileSetGetLvl(STFileSet *fset, int32_t level);
bool
tsdbTFileSetIsEmpty
(
const
STFileSet
*
fset
);
struct
STFileOp
{
int32_t
fid
;
tsdb_fop_t
optype
;
int32_t
fid
;
STFile
of
;
// old file state
STFile
nf
;
// new file state
};
...
...
source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h
浏览文件 @
4cb4c638
...
...
@@ -22,6 +22,8 @@
extern
"C"
{
#endif
typedef
TARRAY2
(
SSttBlk
)
TSttBlkArray
;
// SSttFileReader ==========================================
typedef
struct
SSttFSegReader
SSttFSegReader
;
typedef
struct
SSttFileReader
SSttFileReader
;
...
...
@@ -54,19 +56,20 @@ struct SSttFileReaderConfig {
typedef
struct
SSttFileWriter
SSttFileWriter
;
typedef
struct
SSttFileWriterConfig
SSttFileWriterConfig
;
int32_t
tsdbSttFWriterOpen
(
const
SSttFileWriterConfig
*
config
,
SSttFileWriter
**
ppW
riter
);
int32_t
tsdbSttFWriterClose
(
SSttFileWriter
**
ppWriter
,
int8_t
abort
,
struct
STFileOp
*
op
);
int32_t
tsdbSttFWriteTSData
(
SSttFileWriter
*
pW
riter
,
SRowInfo
*
pRowInfo
);
int32_t
tsdbSttFWriteTSDataBlock
(
SSttFileWriter
*
pW
riter
,
SBlockData
*
pBlockData
);
int32_t
tsdbSttFWriteDLData
(
SSttFileWriter
*
pW
riter
,
TABLEID
*
tbid
,
SDelData
*
pDelData
);
int32_t
tsdbSttFWriterOpen
(
const
SSttFileWriterConfig
*
config
,
SSttFileWriter
**
w
riter
);
int32_t
tsdbSttFWriterClose
(
SSttFileWriter
**
writer
,
int8_t
abort
,
STFileOp
*
op
);
int32_t
tsdbSttFWriteTSData
(
SSttFileWriter
*
w
riter
,
SRowInfo
*
pRowInfo
);
int32_t
tsdbSttFWriteTSDataBlock
(
SSttFileWriter
*
w
riter
,
SBlockData
*
pBlockData
);
int32_t
tsdbSttFWriteDLData
(
SSttFileWriter
*
w
riter
,
TABLEID
*
tbid
,
SDelData
*
pDelData
);
struct
SSttFileWriterConfig
{
STsdb
*
pT
sdb
;
STsdb
*
t
sdb
;
int32_t
maxRow
;
int32_t
szPage
;
int8_t
cmprAlg
;
SSkmInfo
*
pSkmTb
;
SSkmInfo
*
pSkmRow
;
int64_t
compVer
;
// compact version
SSkmInfo
*
skmTb
;
SSkmInfo
*
skmRow
;
uint8_t
**
aBuf
;
STFile
file
;
};
...
...
source/dnode/vnode/src/tsdb/dev/tsdbCommit.c
浏览文件 @
4cb4c638
...
...
@@ -58,12 +58,12 @@ static int32_t open_writer_with_new_stt(SCommitter *pCommitter) {
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
config
.
pT
sdb
=
pTsdb
;
config
.
t
sdb
=
pTsdb
;
config
.
maxRow
=
pCommitter
->
maxRow
;
config
.
szPage
=
pVnode
->
config
.
tsdbPageSize
;
config
.
cmprAlg
=
pCommitter
->
cmprAlg
;
config
.
pS
kmTb
=
NULL
;
config
.
pS
kmRow
=
NULL
;
config
.
s
kmTb
=
NULL
;
config
.
s
kmRow
=
NULL
;
config
.
aBuf
=
NULL
;
config
.
file
.
type
=
TSDB_FTYPE_STT
;
config
.
file
.
did
=
did
;
...
...
@@ -93,12 +93,12 @@ static int32_t open_writer_with_exist_stt(SCommitter *pCommitter, const STFile *
SSttFileWriterConfig
config
=
{
//
.
pT
sdb
=
pTsdb
,
.
t
sdb
=
pTsdb
,
.
maxRow
=
pCommitter
->
maxRow
,
.
szPage
=
pVnode
->
config
.
tsdbPageSize
,
.
cmprAlg
=
pCommitter
->
cmprAlg
,
.
pS
kmTb
=
NULL
,
.
pS
kmRow
=
NULL
,
.
s
kmTb
=
NULL
,
.
s
kmRow
=
NULL
,
.
aBuf
=
NULL
,
.
file
=
*
pFile
//
};
...
...
source/dnode/vnode/src/tsdb/dev/tsdbMerge.c
浏览文件 @
4cb4c638
...
...
@@ -222,12 +222,12 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
// open stt file writer
if
(
lvl
)
{
SSttFileWriterConfig
config
=
{
.
pT
sdb
=
merger
->
tsdb
,
.
t
sdb
=
merger
->
tsdb
,
.
maxRow
=
merger
->
maxRow
,
.
szPage
=
merger
->
szPage
,
.
cmprAlg
=
merger
->
cmprAlg
,
.
pS
kmTb
=
&
merger
->
skmTb
,
.
pS
kmRow
=
&
merger
->
skmRow
,
.
s
kmTb
=
&
merger
->
skmTb
,
.
s
kmRow
=
&
merger
->
skmRow
,
.
aBuf
=
merger
->
aBuf
,
.
file
=
fobj
->
f
,
};
...
...
@@ -235,12 +235,12 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
SSttFileWriterConfig
config
=
{
.
pT
sdb
=
merger
->
tsdb
,
.
t
sdb
=
merger
->
tsdb
,
.
maxRow
=
merger
->
maxRow
,
.
szPage
=
merger
->
szPage
,
.
cmprAlg
=
merger
->
cmprAlg
,
.
pS
kmTb
=
&
merger
->
skmTb
,
.
pS
kmRow
=
&
merger
->
skmRow
,
.
s
kmTb
=
&
merger
->
skmTb
,
.
s
kmRow
=
&
merger
->
skmRow
,
.
aBuf
=
merger
->
aBuf
,
.
file
=
(
STFile
){
...
...
source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c
浏览文件 @
4cb4c638
...
...
@@ -98,190 +98,157 @@ int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock)
// SSttFWriter ============================================================
struct
SSttFileWriter
{
SSttFileWriterConfig
config
;
struct
{
bool
opened
;
}
ctx
;
// file
STFile
tF
ile
;
STFile
f
ile
;
// data
SFSttFooter
footer
;
SBlockData
bData
;
SDelBlock
dData
;
STbStatisBlock
sData
;
SArray
*
aSttBlk
;
// SArray<SSttBlk>
SArray
*
aDelBlk
;
// SArray<SDelBlk>
SArray
*
aStatisBlk
;
// SArray<STbStatisBlk>
TARRAY2
(
SSttBlk
)
sttBlkArray
;
TARRAY2
(
SDelBlk
)
delBlkArray
;
TARRAY2
(
STbStatisBlk
)
statisBlkArray
;
void
*
bloomFilter
;
// TODO
SFSttFooter
footer
;
SBlockData
bData
[
1
];
SDelBlock
dData
[
1
];
STbStatisBlock
sData
[
1
];
// helper data
SSkmInfo
skmTb
;
SSkmInfo
skmRow
;
int32_t
aBufSize
[
5
];
uint8_t
*
aBuf
[
5
];
STsdbFD
*
pF
d
;
STsdbFD
*
f
d
;
};
static
int32_t
write_timeseries_block
(
SSttFileWriter
*
pWriter
)
{
static
int32_t
tsdbSttFileDoWriteTSDataBlock
(
SSttFileWriter
*
writer
)
{
if
(
writer
->
bData
->
nRow
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
;
int32_t
lino
=
0
;
SSttBlk
sttBlk
[
1
];
sttBlk
->
suid
=
writer
->
bData
->
suid
;
sttBlk
->
minUid
=
writer
->
bData
->
uid
?
writer
->
bData
->
uid
:
writer
->
bData
->
aUid
[
0
];
sttBlk
->
maxUid
=
writer
->
bData
->
uid
?
writer
->
bData
->
uid
:
writer
->
bData
->
aUid
[
writer
->
bData
->
nRow
-
1
];
sttBlk
->
minKey
=
sttBlk
->
maxKey
=
writer
->
bData
->
aTSKEY
[
0
];
sttBlk
->
minVer
=
sttBlk
->
maxVer
=
writer
->
bData
->
aVersion
[
0
];
sttBlk
->
nRow
=
writer
->
bData
->
nRow
;
for
(
int32_t
iRow
=
1
;
iRow
<
writer
->
bData
->
nRow
;
iRow
++
)
{
if
(
sttBlk
->
minKey
>
writer
->
bData
->
aTSKEY
[
iRow
])
sttBlk
->
minKey
=
writer
->
bData
->
aTSKEY
[
iRow
];
if
(
sttBlk
->
maxKey
<
writer
->
bData
->
aTSKEY
[
iRow
])
sttBlk
->
maxKey
=
writer
->
bData
->
aTSKEY
[
iRow
];
if
(
sttBlk
->
minVer
>
writer
->
bData
->
aVersion
[
iRow
])
sttBlk
->
minVer
=
writer
->
bData
->
aVersion
[
iRow
];
if
(
sttBlk
->
maxVer
<
writer
->
bData
->
aVersion
[
iRow
])
sttBlk
->
maxVer
=
writer
->
bData
->
aVersion
[
iRow
];
}
code
=
tCmprBlockData
(
writer
->
bData
,
writer
->
config
.
cmprAlg
,
NULL
,
NULL
,
writer
->
config
.
aBuf
,
writer
->
aBufSize
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
SBlockData
*
pBData
=
&
pWriter
->
bData
;
SSttBlk
*
pSttBlk
=
(
SSttBlk
*
)
taosArrayReserve
(
pWriter
->
aSttBlk
,
1
);
if
(
pSttBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
sttBlk
->
bInfo
.
offset
=
writer
->
file
.
size
;
sttBlk
->
bInfo
.
szKey
=
writer
->
aBufSize
[
2
]
+
writer
->
aBufSize
[
3
];
sttBlk
->
bInfo
.
szBlock
=
writer
->
aBufSize
[
0
]
+
writer
->
aBufSize
[
1
]
+
sttBlk
->
bInfo
.
szKey
;
pSttBlk
->
suid
=
pBData
->
suid
;
pSttBlk
->
minUid
=
pBData
->
uid
?
pBData
->
uid
:
pBData
->
aUid
[
0
];
pSttBlk
->
maxUid
=
pBData
->
uid
?
pBData
->
uid
:
pBData
->
aUid
[
pBData
->
nRow
-
1
];
pSttBlk
->
minKey
=
pSttBlk
->
maxKey
=
pBData
->
aTSKEY
[
0
];
pSttBlk
->
minVer
=
pSttBlk
->
maxVer
=
pBData
->
aVersion
[
0
];
pSttBlk
->
nRow
=
pBData
->
nRow
;
for
(
int32_t
iRow
=
1
;
iRow
<
pBData
->
nRow
;
iRow
++
)
{
if
(
pSttBlk
->
minKey
>
pBData
->
aTSKEY
[
iRow
])
pSttBlk
->
minKey
=
pBData
->
aTSKEY
[
iRow
];
if
(
pSttBlk
->
maxKey
<
pBData
->
aTSKEY
[
iRow
])
pSttBlk
->
maxKey
=
pBData
->
aTSKEY
[
iRow
];
if
(
pSttBlk
->
minVer
>
pBData
->
aVersion
[
iRow
])
pSttBlk
->
minVer
=
pBData
->
aVersion
[
iRow
];
if
(
pSttBlk
->
maxVer
<
pBData
->
aVersion
[
iRow
])
pSttBlk
->
maxVer
=
pBData
->
aVersion
[
iRow
];
}
TSDB_CHECK_CODE
(
//
code
=
tCmprBlockData
(
//
pBData
,
//
pWriter
->
config
.
cmprAlg
,
//
NULL
,
//
NULL
,
//
pWriter
->
config
.
aBuf
,
//
pWriter
->
aBufSize
),
//
lino
,
//
_exit
);
pSttBlk
->
bInfo
.
offset
=
pWriter
->
tFile
.
size
;
pSttBlk
->
bInfo
.
szKey
=
pWriter
->
aBufSize
[
2
]
+
pWriter
->
aBufSize
[
3
];
pSttBlk
->
bInfo
.
szBlock
=
pWriter
->
aBufSize
[
0
]
+
pWriter
->
aBufSize
[
1
]
+
pSttBlk
->
bInfo
.
szKey
;
for
(
int32_t
iBuf
=
3
;
iBuf
>=
0
;
iBuf
--
)
{
if
(
pWriter
->
aBufSize
[
iBuf
])
{
TSDB_CHECK_CODE
(
//
code
=
tsdbWriteFile
(
//
pWriter
->
pFd
,
//
pWriter
->
tFile
.
size
,
//
pWriter
->
config
.
aBuf
[
iBuf
],
//
pWriter
->
aBufSize
[
iBuf
]),
//
lino
,
//
_exit
);
pWriter
->
tFile
.
size
+=
pWriter
->
aBufSize
[
iBuf
];
for
(
int32_t
i
=
3
;
i
>=
0
;
i
--
)
{
if
(
writer
->
aBufSize
[
i
])
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
writer
->
config
.
aBuf
[
i
],
writer
->
aBufSize
[
i
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
writer
->
aBufSize
[
i
];
}
}
tBlockDataClear
(
writer
->
bData
);
tBlockDataClear
(
pBData
);
code
=
TARRAY2_APPEND_P
(
&
writer
->
sttBlkArray
,
sttBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
//
"vgId:%d %s failed at line %d since %s"
,
//
TD_VID
(
pWriter
->
config
.
pTsdb
->
pVnode
),
//
__func__
,
//
lino
,
//
tstrerror
(
code
));
}
else
{
// tsdbTrace();
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
write_statistics_block
(
SSttFileWriter
*
pWriter
)
{
static
int32_t
tsdbSttFileDoWriteStatisBlock
(
SSttFileWriter
*
writer
)
{
if
(
writer
->
sData
->
nRow
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
;
int32_t
lino
=
0
;
STbStatisBlk
*
pStatisBlk
=
(
STbStatisBlk
*
)
taosArrayReserve
(
pWriter
->
aStatisBlk
,
1
);
if
(
pStatisBlk
==
NULL
)
{
TSDB_CHECK_CODE
(
code
=
TSDB_CODE_OUT_OF_MEMORY
,
lino
,
_exit
);
}
STbStatisBlk
statisBlk
[
1
];
pStatisBlk
->
nRow
=
pWriter
->
sData
.
nRow
;
pStatisBlk
->
minTid
.
suid
=
pWriter
->
sData
.
aData
[
0
][
0
];
pStatisBlk
->
minTid
.
uid
=
pWriter
->
sData
.
aData
[
1
][
0
];
pStatisBlk
->
maxTid
.
suid
=
pWriter
->
sData
.
aData
[
0
][
pWriter
->
sData
.
nRow
-
1
];
pStatisBlk
->
maxTid
.
uid
=
pWriter
->
sData
.
aData
[
1
][
pWriter
->
sData
.
nRow
-
1
];
pStatisBlk
->
minVer
=
pStatisBlk
->
maxVer
=
pStatisBlk
->
maxVer
=
pWriter
->
sData
.
aData
[
2
][
0
];
for
(
int32_t
iRow
=
1
;
iRow
<
pWriter
->
sData
.
nRow
;
iRow
++
)
{
if
(
pStatisBlk
->
minVer
>
pWriter
->
sData
.
aData
[
2
][
iRow
])
pStatisBlk
->
minVer
=
pWriter
->
sData
.
aData
[
2
][
iRow
];
if
(
pStatisBlk
->
maxVer
<
pWriter
->
sData
.
aData
[
2
][
iRow
])
pStatisBlk
->
maxVer
=
pWriter
->
sData
.
aData
[
2
][
iRow
];
statisBlk
->
nRow
=
writer
->
sData
->
nRow
;
statisBlk
->
minTid
.
suid
=
writer
->
sData
->
aData
[
0
][
0
];
statisBlk
->
minTid
.
uid
=
writer
->
sData
->
aData
[
1
][
0
];
statisBlk
->
maxTid
.
suid
=
writer
->
sData
->
aData
[
0
][
writer
->
sData
->
nRow
-
1
];
statisBlk
->
maxTid
.
uid
=
writer
->
sData
->
aData
[
1
][
writer
->
sData
->
nRow
-
1
];
statisBlk
->
minVer
=
statisBlk
->
maxVer
=
statisBlk
->
maxVer
=
writer
->
sData
->
aData
[
2
][
0
];
for
(
int32_t
iRow
=
1
;
iRow
<
writer
->
sData
->
nRow
;
iRow
++
)
{
if
(
statisBlk
->
minVer
>
writer
->
sData
->
aData
[
2
][
iRow
])
statisBlk
->
minVer
=
writer
->
sData
->
aData
[
2
][
iRow
];
if
(
statisBlk
->
maxVer
<
writer
->
sData
->
aData
[
2
][
iRow
])
statisBlk
->
maxVer
=
writer
->
sData
->
aData
[
2
][
iRow
];
}
pStatisBlk
->
dp
.
offset
=
pWriter
->
tF
ile
.
size
;
pS
tatisBlk
->
dp
.
size
=
0
;
statisBlk
->
dp
.
offset
=
writer
->
f
ile
.
size
;
s
tatisBlk
->
dp
.
size
=
0
;
// TODO: add compression here
int64_t
tsize
=
sizeof
(
int64_t
)
*
pWriter
->
sData
.
nRow
;
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
pWriter
->
sData
.
aData
);
i
++
)
{
TSDB_CHECK_CODE
(
//
code
=
tsdbWriteFile
(
//
pWriter
->
pFd
,
//
pWriter
->
tFile
.
size
,
//
(
const
uint8_t
*
)
pWriter
->
sData
.
aData
[
i
],
//
tsize
),
//
lino
,
//
_exit
);
pStatisBlk
->
dp
.
size
+=
tsize
;
pWriter
->
tFile
.
size
+=
tsize
;
int64_t
tsize
=
sizeof
(
int64_t
)
*
writer
->
sData
->
nRow
;
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
sData
->
aData
);
i
++
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
writer
->
sData
->
aData
[
i
],
tsize
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
statisBlk
->
dp
.
size
+=
tsize
;
writer
->
file
.
size
+=
tsize
;
}
tTbStatisBlockClear
(
writer
->
sData
);
tTbStatisBlockClear
(
&
pWriter
->
sData
);
code
=
TARRAY2_APPEND_P
(
&
writer
->
statisBlkArray
,
statisBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
//
"vgId:%d %s failed at line %d since %s"
,
//
TD_VID
(
pWriter
->
config
.
pTsdb
->
pVnode
),
//
__func__
,
//
lino
,
//
tstrerror
(
code
));
}
else
{
// tsdbTrace();
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
write_delete_block
(
SSttFileWriter
*
pWriter
)
{
static
int32_t
tsdbSttFileDoWriteDelBlock
(
SSttFileWriter
*
writer
)
{
if
(
writer
->
dData
->
nRow
==
0
)
return
0
;
int32_t
code
=
0
;
int32_t
lino
;
ASSERTS
(
0
,
"TODO: Not implemented yet"
);
SDelBlk
*
pDelBlk
=
taosArrayReserve
(
pWriter
->
aDelBlk
,
1
);
if
(
pDelBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
SDelBlk
delBlk
[
1
];
pDelBlk
->
nRow
=
pWriter
->
sData
.
nRow
;
pDelBlk
->
minTid
.
suid
=
pWriter
->
sData
.
aData
[
0
][
0
];
pDelBlk
->
minTid
.
uid
=
pWriter
->
sData
.
aData
[
1
][
0
];
pDelBlk
->
maxTid
.
suid
=
pWriter
->
sData
.
aData
[
0
][
pWriter
->
sData
.
nRow
-
1
];
pDelBlk
->
maxTid
.
uid
=
pWriter
->
sData
.
aData
[
1
][
pWriter
->
sData
.
nRow
-
1
];
pDelBlk
->
minVer
=
pDelBlk
->
maxVer
=
pDelBlk
->
maxVer
=
pWriter
->
sData
.
aData
[
2
][
0
];
for
(
int32_t
iRow
=
1
;
iRow
<
pWriter
->
sData
.
nRow
;
iRow
++
)
{
if
(
pDelBlk
->
minVer
>
pWriter
->
sData
.
aData
[
2
][
iRow
])
pDelBlk
->
minVer
=
pWriter
->
sData
.
aData
[
2
][
iRow
];
if
(
pDelBlk
->
maxVer
<
pWriter
->
sData
.
aData
[
2
][
iRow
])
pDelBlk
->
maxVer
=
pWriter
->
sData
.
aData
[
2
][
iRow
];
delBlk
->
nRow
=
writer
->
sData
->
nRow
;
delBlk
->
minTid
.
suid
=
writer
->
sData
->
aData
[
0
][
0
];
delBlk
->
minTid
.
uid
=
writer
->
sData
->
aData
[
1
][
0
];
delBlk
->
maxTid
.
suid
=
writer
->
sData
->
aData
[
0
][
writer
->
sData
->
nRow
-
1
];
delBlk
->
maxTid
.
uid
=
writer
->
sData
->
aData
[
1
][
writer
->
sData
->
nRow
-
1
];
delBlk
->
minVer
=
delBlk
->
maxVer
=
delBlk
->
maxVer
=
writer
->
sData
->
aData
[
2
][
0
];
for
(
int32_t
iRow
=
1
;
iRow
<
writer
->
sData
->
nRow
;
iRow
++
)
{
if
(
delBlk
->
minVer
>
writer
->
sData
->
aData
[
2
][
iRow
])
delBlk
->
minVer
=
writer
->
sData
->
aData
[
2
][
iRow
];
if
(
delBlk
->
maxVer
<
writer
->
sData
->
aData
[
2
][
iRow
])
delBlk
->
maxVer
=
writer
->
sData
->
aData
[
2
][
iRow
];
}
pDelBlk
->
dp
.
offset
=
pWriter
->
tF
ile
.
size
;
pD
elBlk
->
dp
.
size
=
0
;
// TODO
delBlk
->
dp
.
offset
=
writer
->
f
ile
.
size
;
d
elBlk
->
dp
.
size
=
0
;
// TODO
int64_t
tsize
=
sizeof
(
int64_t
)
*
pWriter
->
dData
.
nRow
;
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
pWriter
->
dData
.
aData
);
i
++
)
{
code
=
tsdbWriteFile
(
pWriter
->
pFd
,
pWriter
->
tFile
.
size
,
(
const
uint8_t
*
)
pWriter
->
dData
.
aData
[
i
],
tsize
);
int64_t
tsize
=
sizeof
(
int64_t
)
*
writer
->
dData
->
nRow
;
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
writer
->
dData
->
aData
);
i
++
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
writer
->
dData
->
aData
[
i
],
tsize
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pD
elBlk
->
dp
.
size
+=
tsize
;
pWriter
->
tF
ile
.
size
+=
tsize
;
d
elBlk
->
dp
.
size
+=
tsize
;
writer
->
f
ile
.
size
+=
tsize
;
}
tDelBlockDestroy
(
writer
->
dData
);
tDelBlockDestroy
(
&
pWriter
->
dData
);
code
=
TARRAY2_APPEND_P
(
&
writer
->
delBlkArray
,
delBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pWriter
->
config
.
pT
sdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
t
sdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
// tsdbTrace();
...
...
@@ -289,423 +256,340 @@ _exit:
return
code
;
}
static
int32_t
write_stt_blk
(
SSttFileWriter
*
pW
riter
)
{
static
int32_t
tsdbSttFileDoWriteSttBlk
(
SSttFileWriter
*
w
riter
)
{
int32_t
code
=
0
;
int32_t
lino
;
pWriter
->
footer
.
dict
[
1
].
offset
=
pWriter
->
tF
ile
.
size
;
pWriter
->
footer
.
dict
[
1
].
size
=
sizeof
(
SSttBlk
)
*
taosArrayGetSize
(
pWriter
->
aSttBlk
);
writer
->
footer
.
dict
[
1
].
offset
=
writer
->
f
ile
.
size
;
writer
->
footer
.
dict
[
1
].
size
=
sizeof
(
SSttBlk
)
*
TARRAY2_SIZE
(
&
writer
->
sttBlkArray
);
if
(
pWriter
->
footer
.
dict
[
1
].
size
)
{
TSDB_CHECK_CODE
(
//
code
=
tsdbWriteFile
(
//
pWriter
->
pFd
,
//
pWriter
->
tFile
.
size
,
//
TARRAY_DATA
(
pWriter
->
aSttBlk
),
//
pWriter
->
footer
.
dict
[
1
].
size
),
//
lino
,
//
_exit
);
if
(
writer
->
footer
.
dict
[
1
].
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
sttBlkArray
),
writer
->
footer
.
dict
[
1
].
size
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pWriter
->
tFile
.
size
+=
pW
riter
->
footer
.
dict
[
1
].
size
;
writer
->
file
.
size
+=
w
riter
->
footer
.
dict
[
1
].
size
;
}
_exit:
if
(
code
)
{
tsdbError
(
//
"vgId:%d %s failed at line %d since %s"
,
//
TD_VID
(
pWriter
->
config
.
pTsdb
->
pVnode
),
//
__func__
,
//
lino
,
//
tstrerror
(
code
));
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
write_statistics_blk
(
SSttFileWriter
*
pW
riter
)
{
static
int32_t
tsdbSttFileDoWriteStatisBlk
(
SSttFileWriter
*
w
riter
)
{
int32_t
code
=
0
;
int32_t
lino
;
pWriter
->
footer
.
dict
[
2
].
offset
=
pWriter
->
tFile
.
size
;
pWriter
->
footer
.
dict
[
2
].
size
=
sizeof
(
STbStatisBlock
)
*
taosArrayGetSize
(
pWriter
->
aStatisBlk
);
if
(
pWriter
->
footer
.
dict
[
2
].
size
)
{
TSDB_CHECK_CODE
(
//
code
=
tsdbWriteFile
(
//
pWriter
->
pFd
,
//
pWriter
->
tFile
.
size
,
//
TARRAY_DATA
(
pWriter
->
aStatisBlk
),
//
pWriter
->
footer
.
dict
[
2
].
size
),
//
lino
,
//
_exit
);
writer
->
footer
.
dict
[
2
].
offset
=
writer
->
file
.
size
;
writer
->
footer
.
dict
[
2
].
size
=
sizeof
(
STbStatisBlock
)
*
TARRAY2_SIZE
(
&
writer
->
statisBlkArray
);
pWriter
->
tFile
.
size
+=
pWriter
->
footer
.
dict
[
2
].
size
;
if
(
writer
->
footer
.
dict
[
2
].
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
statisBlkArray
),
writer
->
footer
.
dict
[
2
].
size
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
writer
->
footer
.
dict
[
2
].
size
;
}
_exit:
if
(
code
)
{
tsdbError
(
//
"vgId:%d %s failed at line %d since %s"
,
//
TD_VID
(
pWriter
->
config
.
pTsdb
->
pVnode
),
//
__func__
,
//
lino
,
//
tstrerror
(
code
));
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
write_del_blk
(
SSttFileWriter
*
pW
riter
)
{
static
int32_t
tsdbSttFileDoWriteDelBlk
(
SSttFileWriter
*
w
riter
)
{
int32_t
code
=
0
;
int32_t
lino
;
pWriter
->
footer
.
dict
[
3
].
offset
=
pWriter
->
tFile
.
size
;
pWriter
->
footer
.
dict
[
3
].
size
=
sizeof
(
SDelBlk
)
*
taosArrayGetSize
(
pWriter
->
aDelBlk
);
if
(
pWriter
->
footer
.
dict
[
3
].
size
)
{
TSDB_CHECK_CODE
(
//
code
=
tsdbWriteFile
(
//
pWriter
->
pFd
,
//
pWriter
->
tFile
.
size
,
//
TARRAY_DATA
(
pWriter
->
aDelBlk
),
//
pWriter
->
footer
.
dict
[
3
].
size
),
//
lino
,
//
_exit
);
writer
->
footer
.
dict
[
3
].
offset
=
writer
->
file
.
size
;
writer
->
footer
.
dict
[
3
].
size
=
sizeof
(
SDelBlk
)
*
TARRAY2_SIZE
(
&
writer
->
delBlkArray
);
pWriter
->
tFile
.
size
+=
pWriter
->
footer
.
dict
[
3
].
size
;
if
(
writer
->
footer
.
dict
[
3
].
size
)
{
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
TARRAY2_DATA
(
&
writer
->
delBlkArray
),
writer
->
footer
.
dict
[
3
].
size
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
writer
->
footer
.
dict
[
3
].
size
;
}
_exit:
if
(
code
)
{
tsdbError
(
//
"vgId:%d %s failed at line %d since %s"
,
//
TD_VID
(
pWriter
->
config
.
pTsdb
->
pVnode
),
//
__func__
,
//
lino
,
//
tstrerror
(
code
));
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
write_file_footer
(
SSttFileWriter
*
pWriter
)
{
int32_t
code
=
tsdbWriteFile
(
//
pWriter
->
pFd
,
//
pWriter
->
tFile
.
size
,
//
(
const
uint8_t
*
)
&
pWriter
->
footer
,
//
sizeof
(
pWriter
->
footer
));
pWriter
->
tFile
.
size
+=
sizeof
(
pWriter
->
footer
);
static
int32_t
tsdbSttFileDoWriteFooter
(
SSttFileWriter
*
writer
)
{
int32_t
code
=
tsdbWriteFile
(
writer
->
fd
,
writer
->
file
.
size
,
(
const
uint8_t
*
)
&
writer
->
footer
,
sizeof
(
writer
->
footer
));
writer
->
file
.
size
+=
sizeof
(
writer
->
footer
);
return
code
;
}
static
int32_t
write_file_header
(
SSttFileWriter
*
pWriter
)
{
// TODO
return
0
;
}
static
int32_t
create_stt_fwriter
(
const
SSttFileWriterConfig
*
pConf
,
SSttFileWriter
**
ppWriter
)
{
static
int32_t
tsdbSttFWriterDoOpen
(
SSttFileWriter
*
writer
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
);
// alloc
if
(((
ppWriter
[
0
]
=
taosMemoryCalloc
(
1
,
sizeof
(
*
ppWriter
[
0
])))
==
NULL
)
//
||
((
ppWriter
[
0
]
->
aSttBlk
=
taosArrayInit
(
64
,
sizeof
(
SSttBlk
)))
==
NULL
)
//
||
((
ppWriter
[
0
]
->
aDelBlk
=
taosArrayInit
(
64
,
sizeof
(
SDelBlk
)))
==
NULL
)
//
||
((
ppWriter
[
0
]
->
aStatisBlk
=
taosArrayInit
(
64
,
sizeof
(
STbStatisBlock
)))
==
NULL
)
//
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
// set
writer
->
file
=
writer
->
config
.
file
;
writer
->
file
.
stt
.
nseg
++
;
if
(
!
writer
->
config
.
skmTb
)
writer
->
config
.
skmTb
=
&
writer
->
skmTb
;
if
(
!
writer
->
config
.
skmRow
)
writer
->
config
.
skmRow
=
&
writer
->
skmRow
;
if
(
!
writer
->
config
.
aBuf
)
writer
->
config
.
aBuf
=
writer
->
aBuf
;
if
((
code
=
tBlockDataCreate
(
&
ppWriter
[
0
]
->
bData
))
//
||
(
code
=
tDelBlockCreate
(
&
ppWriter
[
0
]
->
dData
,
pConf
->
maxRow
))
//
||
(
code
=
tTbStatisBlockCreate
(
&
ppWriter
[
0
]
->
sData
,
pConf
->
maxRow
))
//
)
{
goto
_exit
;
}
// open file
int32_t
flag
;
char
fname
[
TSDB_FILENAME_LEN
];
// init
ppWriter
[
0
]
->
config
=
pConf
[
0
];
ppWriter
[
0
]
->
tFile
=
pConf
->
file
;
ppWriter
[
0
]
->
footer
.
prevFooter
=
ppWriter
[
0
]
->
tFile
.
size
;
if
(
pConf
->
pSkmTb
==
NULL
)
{
ppWriter
[
0
]
->
config
.
pSkmTb
=
&
ppWriter
[
0
]
->
skmTb
;
}
if
(
pConf
->
pSkmRow
==
NULL
)
{
ppWriter
[
0
]
->
config
.
pSkmRow
=
&
ppWriter
[
0
]
->
skmRow
;
if
(
writer
->
file
.
size
)
{
flag
=
TD_FILE_READ
|
TD_FILE_WRITE
;
}
else
{
flag
=
TD_FILE_READ
|
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
;
}
if
(
pConf
->
aBuf
==
NULL
)
{
ppWriter
[
0
]
->
config
.
aBuf
=
ppWriter
[
0
]
->
aBuf
;
tsdbTFileName
(
writer
->
config
.
tsdb
,
&
writer
->
file
,
fname
);
code
=
tsdbOpenFile
(
fname
,
writer
->
config
.
szPage
,
flag
,
&
writer
->
fd
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
!
writer
->
file
.
size
)
{
uint8_t
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
code
=
tsdbWriteFile
(
writer
->
fd
,
0
,
hdr
,
sizeof
(
hdr
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
writer
->
file
.
size
+=
sizeof
(
hdr
);
}
_exit:
if
(
code
&&
ppWriter
[
0
])
{
tTbStatisBlockDestroy
(
&
ppWriter
[
0
]
->
sData
);
tDelBlockDestroy
(
&
ppWriter
[
0
]
->
dData
);
tBlockDataDestroy
(
&
ppWriter
[
0
]
->
bData
);
taosArrayDestroy
(
ppWriter
[
0
]
->
aStatisBlk
);
taosArrayDestroy
(
ppWriter
[
0
]
->
aDelBlk
);
taosArrayDestroy
(
ppWriter
[
0
]
->
aSttBlk
);
taosMemoryFree
(
ppWriter
[
0
]);
ppWriter
[
0
]
=
NULL
;
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
vid
,
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
writer
->
ctx
.
opened
=
true
;
}
return
code
;
return
0
;
}
static
int32_t
destroy_stt_fwriter
(
SSttFileWriter
*
pWriter
)
{
if
(
pWriter
)
{
for
(
int32_t
i
=
0
;
i
<
ARRAY_SIZE
(
pWriter
->
aBuf
);
i
++
)
{
tFree
(
pWriter
->
aBuf
[
i
]);
}
tDestroyTSchema
(
pWriter
->
skmRow
.
pTSchema
);
tDestroyTSchema
(
pWriter
->
skmTb
.
pTSchema
);
static
void
tsdbSttFWriterDoClose
(
SSttFileWriter
*
pWriter
)
{
// TODO: do clear the struct
}
tTbStatisBlockDestroy
(
&
pWriter
->
sData
);
tDelBlockDestroy
(
&
pWriter
->
dData
);
tBlockDataDestroy
(
&
pWriter
->
bData
)
;
int32_t
tsdbSttFWriterOpen
(
const
SSttFileWriterConfig
*
config
,
SSttFileWriter
**
writer
)
{
writer
[
0
]
=
taosMemoryMalloc
(
sizeof
(
*
writer
[
0
])
);
if
(
writer
[
0
]
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
taosArrayDestroy
(
pWriter
->
aStatisBlk
);
taosArrayDestroy
(
pWriter
->
aDelBlk
);
taosArrayDestroy
(
pWriter
->
aSttBlk
);
taosMemoryFree
(
pWriter
);
}
writer
[
0
]
->
config
=
config
[
0
];
writer
[
0
]
->
ctx
.
opened
=
false
;
return
0
;
}
static
int32_t
open_stt_fwriter
(
SSttFileWriter
*
pWriter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
pWriter
->
config
.
pTsdb
->
pVnode
);
char
fname
[
TSDB_FILENAME_LEN
];
uint8_t
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
static
int32_t
tsdbSttFileDoWriteBloomFilter
(
SSttFileWriter
*
writer
)
{
// TODO
return
0
;
}
static
int32_t
tsdbSttFileDoUpdateHeader
(
SSttFileWriter
*
writer
)
{
// TODO
return
0
;
}
static
int32_t
tsdbSttFWriterCloseCommit
(
SSttFileWriter
*
writer
,
STFileOp
*
op
)
{
int32_t
lino
;
int32_t
code
;
int32_t
vid
=
TD_VID
(
writer
->
config
.
tsdb
->
pVnode
);
int32_t
flag
=
TD_FILE_READ
|
TD_FILE_WRITE
;
if
(
pWriter
->
tFile
.
size
==
0
)
{
flag
|=
(
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
}
code
=
tsdbSttFileDoWriteTSDataBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
tsdbTFileName
(
pWriter
->
config
.
pTsdb
,
&
pWriter
->
config
.
file
,
fname
);
code
=
tsdbOpenFile
(
fname
,
pWriter
->
config
.
szPage
,
flag
,
&
pWriter
->
pFd
);
code
=
tsdbSttFileDoWriteStatisBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pWriter
->
tFile
.
size
==
0
)
{
code
=
tsdbWriteFile
(
pWriter
->
pFd
,
0
,
hdr
,
sizeof
(
hdr
));
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbSttFileDoWriteDelBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pWriter
->
tFile
.
size
+=
sizeof
(
hd
r
);
}
code
=
tsdbSttFileDoWriteSttBlk
(
write
r
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
if
(
pWriter
->
pFd
)
tsdbCloseFile
(
&
pWriter
->
pFd
);
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
vid
,
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbDebug
(
"vgId:%d %s done, fname:%s size:%"
PRId64
,
vid
,
__func__
,
""
/*pWriter->config.file.fname*/
,
pWriter
->
config
.
file
.
size
);
}
return
code
;
}
code
=
tsdbSttFileDoWriteStatisBlk
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
static
int32_t
close_stt_fwriter
(
SSttFileWriter
*
pWriter
)
{
tsdbCloseFile
(
&
pWriter
->
pFd
);
return
0
;
}
code
=
tsdbSttFileDoWriteDelBlk
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
int32_t
tsdbSttFWriterOpen
(
const
SSttFileWriterConfig
*
pConf
,
SSttFileWriter
**
ppWriter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
pConf
->
pTsdb
->
pVnode
);
code
=
tsdbSttFileDoWriteBloomFilter
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbSttFileDoWriteFooter
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
create_stt_fwriter
(
pConf
,
ppW
riter
);
code
=
tsdbSttFileDoUpdateHeader
(
w
riter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
open_stt_fwriter
(
ppWriter
[
0
]
);
code
=
tsdbFsyncFile
(
writer
->
fd
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbCloseFile
(
&
writer
->
fd
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
ASSERT
(
writer
->
config
.
file
.
size
>
writer
->
file
.
size
);
op
->
optype
=
writer
->
config
.
file
.
size
?
TSDB_FOP_MODIFY
:
TSDB_FOP_CREATE
;
op
->
fid
=
writer
->
config
.
file
.
fid
;
op
->
of
=
writer
->
config
.
file
;
op
->
nf
=
writer
->
file
;
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
vid
,
__func__
,
lino
,
tstrerror
(
code
));
if
(
ppWriter
[
0
])
{
destroy_stt_fwriter
(
ppWriter
[
0
]);
ppWriter
[
0
]
=
NULL
;
}
}
return
code
;
}
int32_t
tsdbSttFWriterClose
(
SSttFileWriter
**
ppWriter
,
int8_t
abort
,
struct
STFileOp
*
op
)
{
int32_t
vgId
=
TD_VID
(
ppWriter
[
0
]
->
config
.
pTsdb
->
pVnode
);
static
int32_t
tsdbSttFWriterCloseAbort
(
SSttFileWriter
*
writer
)
{
// TODO
ASSERT
(
0
);
return
0
;
}
int32_t
tsdbSttFWriterClose
(
SSttFileWriter
**
writer
,
int8_t
abort
,
STFileOp
*
op
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
int32_t
vid
=
TD_VID
(
writer
[
0
]
->
config
.
tsdb
->
pVnode
);
if
(
!
abort
)
{
ppWriter
[
0
]
->
tFile
.
stt
.
nseg
++
;
if
(
ppWriter
[
0
]
->
bData
.
nRow
>
0
)
{
code
=
write_timeseries_block
(
ppWriter
[
0
]);
if
(
!
writer
[
0
]
->
ctx
.
opened
)
{
op
->
optype
=
TSDB_FOP_NONE
;
}
else
{
if
(
!
abort
)
{
code
=
tsdbSttFWriterCloseCommit
(
writer
[
0
],
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
ppWriter
[
0
]
->
sData
.
nRow
>
0
)
{
code
=
write_statistics_block
(
ppWriter
[
0
]);
}
else
{
code
=
tsdbSttFWriterCloseAbort
(
writer
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
ppWriter
[
0
]
->
dData
.
nRow
>
0
)
{
code
=
write_delete_block
(
ppWriter
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
write_stt_blk
(
ppWriter
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
write_statistics_blk
(
ppWriter
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
write_del_blk
(
ppWriter
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
write_file_footer
(
ppWriter
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
write_file_header
(
ppWriter
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbFsyncFile
(
ppWriter
[
0
]
->
pFd
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
op
)
{
STFile
*
f
=
&
ppWriter
[
0
]
->
config
.
file
;
op
->
fid
=
f
->
fid
;
if
(
f
->
size
==
0
)
{
op
->
optype
=
TSDB_FOP_CREATE
;
}
else
{
op
->
optype
=
TSDB_FOP_MODIFY
;
}
op
->
of
=
f
[
0
];
op
->
nf
=
ppWriter
[
0
]
->
tFile
;
}
tsdbSttFWriterDoClose
(
writer
[
0
]);
}
code
=
close_stt_fwriter
(
ppWriter
[
0
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
destroy_stt_fwriter
(
ppWriter
[
0
]);
ppWriter
[
0
]
=
NULL
;
taosMemoryFree
(
writer
[
0
]);
writer
[
0
]
=
NULL
;
_exit:
if
(
code
)
{
tsdbError
(
//
"vgId:%d %s failed at line %d since %s"
,
//
vgId
,
//
__func__
,
//
lino
,
//
tstrerror
(
code
));
}
else
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
vid
,
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
int32_t
tsdbSttFWriteTSData
(
SSttFileWriter
*
pWriter
,
SRowInfo
*
pRowInfo
)
{
int32_t
tsdbSttFWriteTSData
(
SSttFileWriter
*
writer
,
SRowInfo
*
row
)
{
int32_t
code
=
0
;
int32_t
lino
;
int32_t
lino
=
0
;
TABLEID
*
tbid
=
(
TABLEID
*
)
pRowInfo
;
TSDBROW
*
pRow
=
&
pRowInfo
->
row
;
TSDBKEY
key
=
TSDBROW_KEY
(
pRow
);
if
(
!
writer
->
ctx
.
opened
)
{
code
=
tsdbSttFWriterDoOpen
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
!
TABLE_SAME_SCHEMA
(
pWriter
->
bData
.
suid
,
pWriter
->
bData
.
uid
,
tbid
->
suid
,
tbid
->
uid
))
{
if
(
pWriter
->
bData
.
nRow
>
0
)
{
code
=
write_timeseries_block
(
pWriter
);
TABLEID
*
tbid
=
(
TABLEID
*
)
row
;
TSDBROW
*
pRow
=
&
row
->
row
;
TSDBKEY
key
=
TSDBROW_KEY
(
pRow
);
if
(
!
TABLE_SAME_SCHEMA
(
writer
->
bData
[
0
].
suid
,
writer
->
bData
[
0
].
uid
,
tbid
->
suid
,
tbid
->
uid
))
{
if
(
writer
->
bData
[
0
].
nRow
>
0
)
{
code
=
tsdbSttFileDoWriteTSDataBlock
(
writer
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
pWriter
->
sData
.
nRow
>=
pW
riter
->
config
.
maxRow
)
{
code
=
write_statistics_block
(
pW
riter
);
if
(
writer
->
sData
[
0
].
nRow
>=
w
riter
->
config
.
maxRow
)
{
code
=
tsdbSttFileDoWriteStatisBlock
(
w
riter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pWriter
->
sData
.
aData
[
0
][
pWriter
->
sData
.
nRow
]
=
tbid
->
suid
;
// suid
pWriter
->
sData
.
aData
[
1
][
pWriter
->
sData
.
nRow
]
=
tbid
->
uid
;
// uid
pWriter
->
sData
.
aData
[
2
][
pWriter
->
sData
.
nRow
]
=
key
.
ts
;
// skey
pWriter
->
sData
.
aData
[
3
][
pWriter
->
sData
.
nRow
]
=
key
.
version
;
// sver
pWriter
->
sData
.
aData
[
4
][
pWriter
->
sData
.
nRow
]
=
key
.
ts
;
// ekey
pWriter
->
sData
.
aData
[
5
][
pWriter
->
sData
.
nRow
]
=
key
.
version
;
// ever
pWriter
->
sData
.
aData
[
6
][
pWriter
->
sData
.
nRow
]
=
1
;
// count
pWriter
->
sData
.
nRow
++
;
writer
->
sData
[
0
].
aData
[
0
][
writer
->
sData
[
0
]
.
nRow
]
=
tbid
->
suid
;
// suid
writer
->
sData
[
0
].
aData
[
1
][
writer
->
sData
[
0
]
.
nRow
]
=
tbid
->
uid
;
// uid
writer
->
sData
[
0
].
aData
[
2
][
writer
->
sData
[
0
]
.
nRow
]
=
key
.
ts
;
// skey
writer
->
sData
[
0
].
aData
[
3
][
writer
->
sData
[
0
]
.
nRow
]
=
key
.
version
;
// sver
writer
->
sData
[
0
].
aData
[
4
][
writer
->
sData
[
0
]
.
nRow
]
=
key
.
ts
;
// ekey
writer
->
sData
[
0
].
aData
[
5
][
writer
->
sData
[
0
]
.
nRow
]
=
key
.
version
;
// ever
writer
->
sData
[
0
].
aData
[
6
][
writer
->
sData
[
0
]
.
nRow
]
=
1
;
// count
writer
->
sData
[
0
]
.
nRow
++
;
code
=
tsdbUpdateSkmTb
(
pWriter
->
config
.
pTsdb
,
tbid
,
pWriter
->
config
.
pS
kmTb
);
code
=
tsdbUpdateSkmTb
(
writer
->
config
.
tsdb
,
tbid
,
writer
->
config
.
s
kmTb
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
TABLEID
id
=
{
.
suid
=
tbid
->
suid
,
.
uid
=
tbid
->
uid
?
0
:
tbid
->
uid
,
};
code
=
tBlockDataInit
(
&
pWriter
->
bData
,
&
id
,
pWriter
->
config
.
pS
kmTb
->
pTSchema
,
NULL
,
0
);
code
=
tBlockDataInit
(
&
writer
->
bData
[
0
],
&
id
,
writer
->
config
.
s
kmTb
->
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
pRowInfo
->
row
.
type
==
TSDBROW_ROW_FMT
)
{
code
=
tsdbUpdateSkmRow
(
pWriter
->
config
.
pTsdb
,
tbid
,
TSDBROW_SVERSION
(
pRow
),
pWriter
->
config
.
pS
kmRow
);
if
(
row
->
row
.
type
==
TSDBROW_ROW_FMT
)
{
code
=
tsdbUpdateSkmRow
(
writer
->
config
.
tsdb
,
tbid
,
TSDBROW_SVERSION
(
pRow
),
writer
->
config
.
s
kmRow
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code
=
tBlockDataAppendRow
(
&
pWriter
->
bData
,
pRow
,
pWriter
->
config
.
pS
kmRow
->
pTSchema
,
tbid
->
uid
);
code
=
tBlockDataAppendRow
(
&
writer
->
bData
[
0
],
pRow
,
writer
->
config
.
s
kmRow
->
pTSchema
,
tbid
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pWriter
->
bData
.
nRow
>=
pW
riter
->
config
.
maxRow
)
{
code
=
write_timeseries_block
(
pW
riter
);
if
(
writer
->
bData
[
0
].
nRow
>=
w
riter
->
config
.
maxRow
)
{
code
=
tsdbSttFileDoWriteTSDataBlock
(
w
riter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
if
(
key
.
ts
>
pWriter
->
sData
.
aData
[
4
][
pWriter
->
sData
.
nRow
-
1
])
{
pWriter
->
sData
.
aData
[
4
][
pWriter
->
sData
.
nRow
-
1
]
=
key
.
ts
;
// ekey
pWriter
->
sData
.
aData
[
5
][
pWriter
->
sData
.
nRow
-
1
]
=
key
.
version
;
// ever
pWriter
->
sData
.
aData
[
6
][
pWriter
->
sData
.
nRow
-
1
]
++
;
// count
}
else
if
(
key
.
ts
==
pWriter
->
sData
.
aData
[
4
][
pWriter
->
sData
.
nRow
-
1
])
{
pWriter
->
sData
.
aData
[
4
][
pWriter
->
sData
.
nRow
-
1
]
=
key
.
ts
;
// ekey
pWriter
->
sData
.
aData
[
5
][
pWriter
->
sData
.
nRow
-
1
]
=
key
.
version
;
// ever
if
(
key
.
ts
>
writer
->
sData
[
0
].
aData
[
4
][
writer
->
sData
[
0
]
.
nRow
-
1
])
{
writer
->
sData
[
0
].
aData
[
4
][
writer
->
sData
[
0
]
.
nRow
-
1
]
=
key
.
ts
;
// ekey
writer
->
sData
[
0
].
aData
[
5
][
writer
->
sData
[
0
]
.
nRow
-
1
]
=
key
.
version
;
// ever
writer
->
sData
[
0
].
aData
[
6
][
writer
->
sData
[
0
]
.
nRow
-
1
]
++
;
// count
}
else
if
(
key
.
ts
==
writer
->
sData
[
0
].
aData
[
4
][
writer
->
sData
[
0
]
.
nRow
-
1
])
{
writer
->
sData
[
0
].
aData
[
4
][
writer
->
sData
[
0
]
.
nRow
-
1
]
=
key
.
ts
;
// ekey
writer
->
sData
[
0
].
aData
[
5
][
writer
->
sData
[
0
]
.
nRow
-
1
]
=
key
.
version
;
// ever
}
else
{
ASSERTS
(
0
,
"timestamp should be in ascending order"
);
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pWriter
->
config
.
pT
sdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
t
sdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
int32_t
tsdbSttFWriteTSDataBlock
(
SSttFileWriter
*
pWriter
,
SBlockData
*
pBlockD
ata
)
{
int32_t
tsdbSttFWriteTSDataBlock
(
SSttFileWriter
*
writer
,
SBlockData
*
bd
ata
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SRowInfo
rowInfo
;
rowInfo
.
suid
=
pBlockD
ata
->
suid
;
for
(
int32_t
i
=
0
;
i
<
pBlockD
ata
->
nRow
;
i
++
)
{
rowInfo
.
uid
=
pBlockData
->
uid
?
pBlockData
->
uid
:
pBlockD
ata
->
aUid
[
i
];
rowInfo
.
row
=
tsdbRowFromBlockData
(
pBlockD
ata
,
i
);
rowInfo
.
suid
=
bd
ata
->
suid
;
for
(
int32_t
i
=
0
;
i
<
bd
ata
->
nRow
;
i
++
)
{
rowInfo
.
uid
=
bdata
->
uid
?
bdata
->
uid
:
bd
ata
->
aUid
[
i
];
rowInfo
.
row
=
tsdbRowFromBlockData
(
bd
ata
,
i
);
code
=
tsdbSttFWriteTSData
(
pW
riter
,
&
rowInfo
);
code
=
tsdbSttFWriteTSData
(
w
riter
,
&
rowInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pWriter
->
config
.
pT
sdb
->
pVnode
),
__func__
,
lino
,
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
writer
->
config
.
t
sdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
0
;
}
int32_t
tsdbSttFWriteDLData
(
SSttFileWriter
*
pW
riter
,
TABLEID
*
tbid
,
SDelData
*
pDelData
)
{
int32_t
tsdbSttFWriteDLData
(
SSttFileWriter
*
w
riter
,
TABLEID
*
tbid
,
SDelData
*
pDelData
)
{
ASSERTS
(
0
,
"TODO: Not implemented yet"
);
pWriter
->
dData
.
aData
[
0
][
pWriter
->
dData
.
nRow
]
=
tbid
->
suid
;
// suid
pWriter
->
dData
.
aData
[
1
][
pWriter
->
dData
.
nRow
]
=
tbid
->
uid
;
// uid
pWriter
->
dData
.
aData
[
2
][
pWriter
->
dData
.
nRow
]
=
pDelData
->
version
;
// version
pWriter
->
dData
.
aData
[
3
][
pWriter
->
dData
.
nRow
]
=
pDelData
->
sKey
;
// skey
pWriter
->
dData
.
aData
[
4
][
pWriter
->
dData
.
nRow
]
=
pDelData
->
eKey
;
// ekey
pWriter
->
dData
.
nRow
++
;
int32_t
code
;
if
(
!
writer
->
ctx
.
opened
)
{
code
=
tsdbSttFWriterDoOpen
(
writer
);
return
code
;
}
writer
->
dData
[
0
].
aData
[
0
][
writer
->
dData
[
0
].
nRow
]
=
tbid
->
suid
;
// suid
writer
->
dData
[
0
].
aData
[
1
][
writer
->
dData
[
0
].
nRow
]
=
tbid
->
uid
;
// uid
writer
->
dData
[
0
].
aData
[
2
][
writer
->
dData
[
0
].
nRow
]
=
pDelData
->
version
;
// version
writer
->
dData
[
0
].
aData
[
3
][
writer
->
dData
[
0
].
nRow
]
=
pDelData
->
sKey
;
// skey
writer
->
dData
[
0
].
aData
[
4
][
writer
->
dData
[
0
].
nRow
]
=
pDelData
->
eKey
;
// ekey
writer
->
dData
[
0
].
nRow
++
;
if
(
pWriter
->
dData
.
nRow
>=
pW
riter
->
config
.
maxRow
)
{
return
write_delete_block
(
pW
riter
);
if
(
writer
->
dData
[
0
].
nRow
>=
w
riter
->
config
.
maxRow
)
{
return
tsdbSttFileDoWriteDelBlock
(
w
riter
);
}
else
{
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录