Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a885d6e3
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a885d6e3
编写于
2月 07, 2023
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
09628401
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
392 addition
and
159 deletion
+392
-159
source/dnode/vnode/src/tsdb/tsdbCompact.c
source/dnode/vnode/src/tsdb/tsdbCompact.c
+392
-159
未找到文件。
source/dnode/vnode/src/tsdb/tsdbCompact.c
浏览文件 @
a885d6e3
...
...
@@ -14,6 +14,7 @@
*/
#include "tsdb.h"
#if 0
#define TSDB_ITER_TYPE_MEM 0x0
...
...
@@ -53,41 +54,6 @@ typedef struct STsdbDataIter {
#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n)))
typedef struct {
STsdb *pTsdb;
int64_t cid;
int8_t cmprAlg;
int32_t maxRows;
int32_t minRows;
STsdbFS fs;
int32_t fid;
SDFileSet *pDFileSet;
// Tombstone
SDelFReader *pDelFReader;
SArray *aDelIdx; // SArray<SDelIdx>
SArray *aDelData; // SArray<SDelData>
SArray *aSkyLine; // SArray<TSDBKEY>
TSDBKEY *aTSDBKEY;
int32_t iKey;
TSDBKEY sKey;
// Reader
SDataFReader *pReader;
STsdbDataIter *iterList; // list of iterators
SRBTree rtree;
STsdbDataIter *pIter;
SBlockData bData;
SSkmInfo tbSkm;
// Writer
SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk; // SMapData<SDataBlk>
SArray *aSttBlk; // SArray<SSttBlk>
TABLEID tableId;
} STsdbCompactor;
#define TSDB_FLG_DEEP_COMPACT 0x1
// ITER =========================
...
...
@@ -306,57 +272,6 @@ _exit:
}
// COMPACT =========================
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
pCompactor->pTsdb = pTsdb;
pCompactor->cid = 0; // TODO
pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
code = tsdbFSCopy(pTsdb, &pCompactor->fs);
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->fid = INT32_MIN;
code = tBlockDataCreate(&pCompactor->bData);
TSDB_CHECK_CODE(code, lino, _exit);
// tombstone
if (pCompactor->fs.pDelFile) {
code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
if (pCompactor->aDelIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pCompactor->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY));
if (pCompactor->aSkyLine == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
taosArrayDestroy(pCompactor->aSkyLine);
...
...
@@ -835,101 +750,311 @@ _exit:
return code;
}
static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) {
#endif
extern
int32_t
tsdbUpdateTableSchema
(
SMeta
*
pMeta
,
int64_t
suid
,
int64_t
uid
,
SSkmInfo
*
pSkmInfo
);
extern
int32_t
tsdbWriteDataBlock
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SMapData
*
mDataBlk
,
int8_t
cmprAlg
);
extern
int32_t
tsdbWriteSttBlock
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
SArray
*
aSttBlk
,
int8_t
cmprAlg
);
typedef
struct
{
STsdb
*
pTsdb
;
int64_t
commitID
;
int8_t
cmprAlg
;
int32_t
maxRows
;
int32_t
minRows
;
STsdbFS
fs
;
int32_t
fid
;
TABLEID
tbid
;
SSkmInfo
tbSkm
;
// Tombstone
SDelFReader
*
pDelFReader
;
SArray
*
aDelIdx
;
// SArray<SDelIdx>
SArray
*
aDelData
;
// SArray<SDelData>
SArray
*
aSkyLine
;
// SArray<TSDBKEY>
TSDBKEY
*
aTSDBKEY
;
int32_t
iKey
;
TSDBKEY
sKey
;
// Reader
SDataFReader
*
pReader
;
STsdbDataIter2
*
iterList
;
// list of iterators
STsdbDataIter2
*
pIter
;
SRBTree
rbt
;
// Writer
SDataFWriter
*
pWriter
;
SArray
*
aBlockIdx
;
// SArray<SBlockIdx>
SMapData
mDataBlk
;
// SMapData<SDataBlk>
SArray
*
aSttBlk
;
// SArray<SSttBlk>
SBlockData
bData
;
SBlockData
sData
;
}
STsdbCompactor
;
static
int32_t
tsdbCompactWriteTableDataStart
(
STsdbCompactor
*
pCompactor
,
TABLEID
*
pId
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
// open compactor
code = tsdbOpenCompactor(pCompactor);
pCompactor
->
tbid
=
*
pId
;
// TODO
// update table schema if necessary (TODO)
code
=
tsdbUpdateTableSchema
(
pCompactor
->
pTsdb
->
pVnode
->
pMeta
,
pId
->
suid
,
pId
->
uid
,
&
pCompactor
->
tbSkm
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// do compact
SRowInfo *pRowInfo;
STSchema *pTSchema;
int64_t nRow = 0;
tMapDataReset
(
&
pCompactor
->
mDataBlk
);
code = t
sdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema
);
code
=
t
BlockDataInit
(
&
pCompactor
->
bData
,
pId
,
pCompactor
->
tbSkm
.
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) {
if
(
!
TABLE_SAME_SCHEMA
(
pCompactor
->
tbid
.
suid
,
pCompactor
->
tbid
.
uid
,
pId
->
suid
,
pId
->
uid
))
{
if
(
pCompactor
->
sData
.
nRow
>
0
)
{
code
=
tsdbWriteSttBlock
(
pCompactor
->
pWriter
,
&
pCompactor
->
sData
,
pCompactor
->
aSttBlk
,
pCompactor
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
}
while (pRowInfo) {
// if suid changed
if (pCompactor->tableId.suid != pRowInfo->suid) {
code = tsdbCompactWriteBlockData(pCompactor);
code
=
tBlockDataInit
(
&
pCompactor
->
sData
,
pId
/* TODO */
,
pCompactor
->
tbSkm
.
pTSchema
,
NULL
,
0
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code = tsdbCompactWriteDataBlk(pCompactor);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbDebug
(
"vgId:%d %s done, suid:%"
PRId64
" uid:%"
PRId64
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
pId
->
suid
,
pId
->
uid
);
}
return
code
;
}
static
int32_t
tsdbCompactWriteTableDataEnd
(
STsdbCompactor
*
pCompactor
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
pCompactor
->
bData
.
nRow
>
0
)
{
if
(
pCompactor
->
bData
.
nRow
<
pCompactor
->
minRows
)
{
for
(
int32_t
iRow
=
0
;
iRow
<
pCompactor
->
bData
.
nRow
;
iRow
++
)
{
code
=
tBlockDataAppendRow
(
&
pCompactor
->
sData
,
&
tsdbRowFromBlockData
(
&
pCompactor
->
bData
,
iRow
),
NULL
,
pCompactor
->
tbid
.
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pCompactor
->
sData
.
nRow
>=
pCompactor
->
maxRows
)
{
code
=
tsdbWriteSttBlock
(
pCompactor
->
pWriter
,
&
pCompactor
->
sData
,
pCompactor
->
aSttBlk
,
pCompactor
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
tBlockDataClear
(
&
pCompactor
->
bData
);
}
else
{
code
=
tsdbWriteDataBlock
(
pCompactor
->
pWriter
,
&
pCompactor
->
bData
,
&
pCompactor
->
mDataBlk
,
pCompactor
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
if
(
pCompactor
->
mDataBlk
.
nItem
)
{
SBlockIdx
*
pBlockIdx
=
(
SBlockIdx
*
)
taosArrayReserve
(
pCompactor
->
aBlockIdx
,
1
);
if
(
pBlockIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pBlockIdx
->
suid
=
pCompactor
->
tbid
.
suid
;
pBlockIdx
->
uid
=
pCompactor
->
tbid
.
uid
;
code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0
);
code
=
tsdbWriteDataBlk
(
pCompactor
->
pWriter
,
&
pCompactor
->
mDataBlk
,
pBlockIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
tsdbDebug
(
"vgId:%d %s done, suid:%"
PRId64
" uid:%"
PRId64
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
pCompactor
->
tbid
.
suid
,
pCompactor
->
tbid
.
uid
);
}
return
code
;
}
static
int32_t
tsdbCompactWriteTableData
(
STsdbCompactor
*
pCompactor
,
SRowInfo
*
pRowInfo
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
SRowInfo
rInfo
;
if
(
pRowInfo
==
NULL
)
{
rInfo
.
suid
=
INT64_MAX
;
rInfo
.
uid
=
INT64_MAX
;
// rInfo.row = TSDBORW_V;
pRowInfo
=
&
rInfo
;
}
// if uid changed
if (pCompactor->tableId.uid != pRowInfo->uid) {
// if need to write the block data
bool init = false;
if (pCompactor->bData.suid == 0) {
code = tsdbCompactWriteBlockData(pCompactor);
if
(
pRowInfo
->
uid
!=
pCompactor
->
tbid
.
uid
)
{
if
(
pCompactor
->
tbid
.
uid
)
{
code
=
tsdbCompactWriteTableDataEnd
(
pCompactor
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
init = true;
} else if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) {
code = tsdbCompactWriteBlockData(pCompactor
);
}
code
=
tsdbCompactWriteTableDataStart
(
pCompactor
,
(
TABLEID
*
)
pRowInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// write SDataBlk
code = tsdbCompactWriteDataBlk(pCompactor);
code
=
tBlockDataAppendRow
(
&
pCompactor
->
bData
,
&
pRowInfo
->
row
,
NULL
,
pRowInfo
->
uid
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// init block data if need
if (init && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) {
if
(
pCompactor
->
bData
.
nRow
>=
pCompactor
->
maxRows
)
{
code
=
tsdbWriteDataBlock
(
pCompactor
->
pWriter
,
&
pCompactor
->
bData
,
&
pCompactor
->
mDataBlk
,
pCompactor
->
cmprAlg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
if
(
pRowInfo
)
{
tsdbTrace
(
"vgId:%d %s done, suid:%"
PRId64
" uid:%"
PRId64
" ts:%"
PRId64
" version:%"
PRId64
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
pRowInfo
->
suid
,
pRowInfo
->
uid
,
TSDBROW_TS
(
&
pRowInfo
->
row
),
TSDBROW_VERSION
(
&
pRowInfo
->
row
));
}
return
code
;
}
// if append/merge the row causes nRow exceed maxRows
if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) {
code = tsdbCompactWriteBlockData(pCompactor);
static
int32_t
tsdbCompactNextRow
(
STsdbCompactor
*
pCompactor
,
SRowInfo
**
ppRowInfo
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
pCompactor
->
pIter
)
{
code
=
tsdbDataIterNext2
(
pCompactor
->
pIter
,
NULL
/* TODO */
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pCompactor
->
pIter
->
rowInfo
.
suid
==
0
&&
pCompactor
->
pIter
->
rowInfo
.
uid
==
0
)
{
pCompactor
->
pIter
=
NULL
;
}
else
{
SRBTreeNode
*
pNode
=
tRBTreeMin
(
&
pCompactor
->
rbt
);
if
(
pNode
)
{
int32_t
c
=
tsdbDataIterCmprFn
(
&
pCompactor
->
pIter
->
rbtn
,
pNode
);
if
(
c
>
0
)
{
tRBTreePut
(
&
pCompactor
->
rbt
,
&
pCompactor
->
pIter
->
rbtn
);
pCompactor
->
pIter
=
NULL
;
}
else
if
(
c
==
0
)
{
ASSERT
(
0
);
}
}
}
}
if
(
pCompactor
->
pIter
==
NULL
)
{
SRBTreeNode
*
pNode
=
tRBTreeMin
(
&
pCompactor
->
rbt
);
if
(
pNode
)
{
tRBTreeDrop
(
&
pCompactor
->
rbt
,
pNode
);
pCompactor
->
pIter
=
TSDB_RBTN_TO_DATA_ITER
(
pNode
);
}
}
if
(
ppRowInfo
)
{
if
(
pCompactor
->
pIter
)
{
*
ppRowInfo
=
&
pCompactor
->
pIter
->
rowInfo
;
}
else
{
*
ppRowInfo
=
NULL
;
}
}
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
return
code
;
}
static
int32_t
tsdbCompactFileSetStart
(
STsdbCompactor
*
pCompactor
,
SDFileSet
*
pSet
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
pCompactor
->
fid
=
pSet
->
fid
;
pCompactor
->
tbid
=
(
TABLEID
){
0
};
// append/merge the row
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid
);
/* reader */
code
=
tsdbDataFReaderOpen
(
&
pCompactor
->
pReader
,
pCompactor
->
pTsdb
,
pSet
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// iter to the next row
code = tsdbCompactNextRow(pCompactor);
code
=
tsdbOpenDataFileDataIter
(
pCompactor
->
pReader
,
&
pCompactor
->
pIter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema);
if
(
pCompactor
->
pIter
)
{
pCompactor
->
pIter
->
next
=
pCompactor
->
iterList
;
pCompactor
->
iterList
=
pCompactor
->
pIter
;
}
for
(
int32_t
iStt
=
0
;
iStt
<
pSet
->
nSttF
;
iStt
++
)
{
code
=
tsdbOpenSttFileDataIter
(
pCompactor
->
pReader
,
iStt
,
&
pCompactor
->
pIter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pCompactor
->
pIter
)
{
pCompactor
->
pIter
->
next
=
pCompactor
->
iterList
;
pCompactor
->
iterList
=
pCompactor
->
pIter
;
}
}
// handle remain data
code = tsdbCompactWriteBlockData(pCompactor);
pCompactor
->
pIter
=
NULL
;
tRBTreeCreate
(
&
pCompactor
->
rbt
,
tsdbDataIterCmprFn
);
/* writer */
code
=
tsdbDataFWriterOpen
(
&
pCompactor
->
pWriter
,
pCompactor
->
pTsdb
,
&
(
SDFileSet
){.
fid
=
pCompactor
->
fid
,
.
diskId
=
pSet
->
diskId
,
.
pHeadF
=
&
(
SHeadFile
){.
commitID
=
pCompactor
->
commitID
},
.
pDataF
=
&
(
SDataFile
){.
commitID
=
pCompactor
->
commitID
},
.
pSmaF
=
&
(
SSmaFile
){.
commitID
=
pCompactor
->
commitID
},
.
nSttF
=
1
,
.
aSttF
=
{
&
(
SSttFile
){.
commitID
=
pCompactor
->
commitID
}}});
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code = tsdbCompactWriteDataBlk(pCompactor);
if
(
pCompactor
->
aBlockIdx
)
{
taosArrayClear
(
pCompactor
->
aBlockIdx
);
}
else
if
((
pCompactor
->
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
)))
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
tMapDataReset
(
&
pCompactor
->
mDataBlk
);
if
(
pCompactor
->
aSttBlk
)
{
taosArrayClear
(
pCompactor
->
aSttBlk
);
}
else
if
((
pCompactor
->
aSttBlk
=
taosArrayInit
(
0
,
sizeof
(
SSttBlk
)))
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
tBlockDataReset
(
&
pCompactor
->
bData
);
tBlockDataReset
(
&
pCompactor
->
sData
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s, fid:%d"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
),
pCompactor
->
fid
);
}
else
{
tsdbInfo
(
"vgId:%d %s done, fid:%d"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
pCompactor
->
fid
);
}
return
code
;
}
static
int32_t
tsdbCompactFileSetEnd
(
STsdbCompactor
*
pCompactor
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
/* finish remaining data (TODO) */
/* update files */
code
=
tsdbWriteSttBlk
(
pCompactor
->
pWriter
,
pCompactor
->
aSttBlk
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbWriteBlockIdx
(
pCompactor
->
pWriter
,
pCompactor
->
aBlockIdx
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbUpdateDFileSetHeader
(
pCompactor
->
pWriter
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
...
...
@@ -939,47 +1064,155 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) {
code
=
tsdbDataFWriterClose
(
&
pCompactor
->
pWriter
,
1
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbDataFReaderClose
(
&
pCompactor
->
pReader
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
/* do clear */
while
((
pCompactor
->
pIter
=
pCompactor
->
iterList
)
!=
NULL
)
{
pCompactor
->
iterList
=
pCompactor
->
pIter
->
next
;
tsdbCloseDataIter2
(
pCompactor
->
pIter
);
}
tBlockDataReset
(
&
pCompactor
->
bData
);
tBlockDataReset
(
&
pCompactor
->
sData
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s, fid:%d"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
),
pCompactor
->
fid
);
}
else
{
tsdbInfo
(
"vgId:%d %s done, fid:%d"
,
TD_VID
(
pCompactor
->
pTsdb
->
pVnode
),
__func__
,
pCompactor
->
fid
);
}
return
code
;
}
static
int32_t
tsdbCompactFileSet
(
STsdbCompactor
*
pCompactor
,
SDFileSet
*
pSet
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
// start compact
code
=
tsdbCompactFileSetStart
(
pCompactor
,
pSet
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// do compact
SRowInfo
*
pRowInfo
;
for
(;;)
{
code
=
tsdbCompactNextRow
(
pCompactor
,
&
pRowInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tsdbCompactWriteTableData
(
pCompactor
,
pRowInfo
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
if
(
pRowInfo
==
NULL
)
break
;
}
// end compact
code
=
tsdbCompactFileSetEnd
(
pCompactor
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
// close compactor
tsdbCloseCompactor(pCompactor);
return
code
;
}
static
int32_t
tsdbBeginCompact
(
STsdb
*
pTsdb
,
STsdbCompactor
*
pCompactor
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
pCompactor
->
pTsdb
=
pTsdb
;
pCompactor
->
commitID
=
0
;
// TODO
pCompactor
->
cmprAlg
=
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
compression
;
pCompactor
->
maxRows
=
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
maxRows
;
pCompactor
->
minRows
=
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
minRows
;
pCompactor
->
fid
=
INT32_MIN
;
code
=
tsdbFSCopy
(
pTsdb
,
&
pCompactor
->
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
/* tombstone (TODO ) */
#if 0
if (pCompactor->fs.pDelFile) {
code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
if (pCompactor->aDelIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pCompactor->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY));
if (pCompactor->aSkyLine == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx);
TSDB_CHECK_CODE(code, lino, _exit);
}
#endif
/* reader */
/* writer */
code
=
tBlockDataCreate
(
&
pCompactor
->
bData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
code
=
tBlockDataCreate
(
&
pCompactor
->
sData
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s, commit ID:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
),
pCompactor
->
commitID
);
}
else
{
tsdbInfo
(
"vgId:%d %s done, commit ID:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
pCompactor
->
commitID
);
}
return
code
;
}
int32_t
tsdbCompact
(
STsdb
*
pTsdb
,
int32_t
flag
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
#if 0
STsdbCompactor
*
pCompactor
=
&
(
STsdbCompactor
){
0
};
// begin compact
code
=
tsdbBeginCompact
(
pTsdb
,
pCompactor
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
//
d
o compact each file set
//
loop t
o compact each file set
while
(
true
)
{
pCompactor->pDFile
Set = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
SDFileSet
*
p
Set
=
(
SDFileSet
*
)
taosArraySearch
(
pCompactor
->
fs
.
aDFileSet
,
&
(
SDFileSet
){.
fid
=
pCompactor
->
fid
},
tDFileSetCmprFn
,
TD_GT
);
if (pCompactor->pDFileSet == NULL) break;
if
(
pSet
==
NULL
)
{
pCompactor
->
fid
=
INT32_MAX
;
break
;
}
pCompactor->fid = pCompactor->pDFileSet->fid;
code = tsdbCompactFileSet(pCompactor);
code
=
tsdbCompactFileSet
(
pCompactor
,
pSet
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
#if 0
code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
// commit/abort compact
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbAbortCompact(pCompactor);
} else {
tsdbCommitCompact(pCompactor);
}
tsdbEndCompact(pCompactor);
#endif
_exit:
// // commit/abort compact
// if (code) {
// tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
// tsdbAbortCompact(pCompactor);
// } else {
// tsdbCommitCompact(pCompactor);
// }
// tsdbEndCompact(pCompactor);
return
code
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录