Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
7dc31f63
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7dc31f63
编写于
6月 09, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
6月 09, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #6392 from taosdata/feature/TD-4556
[TD-4556]add compact flag in STsdbRepo
上级
e9c8813e
60068b12
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
513 addition
and
4 deletion
+513
-4
src/inc/tsdb.h
src/inc/tsdb.h
+1
-1
src/tsdb/inc/tsdbint.h
src/tsdb/inc/tsdbint.h
+1
-0
src/tsdb/src/tsdbCompact.c
src/tsdb/src/tsdbCompact.c
+508
-3
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+3
-0
未找到文件。
src/inc/tsdb.h
浏览文件 @
7dc31f63
...
@@ -94,7 +94,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
...
@@ -94,7 +94,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int
tsdbCloseRepo
(
STsdbRepo
*
repo
,
int
toCommit
);
int
tsdbCloseRepo
(
STsdbRepo
*
repo
,
int
toCommit
);
int32_t
tsdbConfigRepo
(
STsdbRepo
*
repo
,
STsdbCfg
*
pCfg
);
int32_t
tsdbConfigRepo
(
STsdbRepo
*
repo
,
STsdbCfg
*
pCfg
);
int
tsdbGetState
(
STsdbRepo
*
repo
);
int
tsdbGetState
(
STsdbRepo
*
repo
);
bool
tsdbInCompact
(
STsdbRepo
*
repo
);
// --------- TSDB TABLE DEFINITION
// --------- TSDB TABLE DEFINITION
typedef
struct
{
typedef
struct
{
uint64_t
uid
;
// the unique table ID
uint64_t
uid
;
// the unique table ID
...
...
src/tsdb/inc/tsdbint.h
浏览文件 @
7dc31f63
...
@@ -92,6 +92,7 @@ struct STsdbRepo {
...
@@ -92,6 +92,7 @@ struct STsdbRepo {
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
bool
repoLocked
;
bool
repoLocked
;
int32_t
code
;
// Commit code
int32_t
code
;
// Commit code
bool
inCompact
;
// is in compact process?
};
};
#define REPO_ID(r) (r)->config.tsdbId
#define REPO_ID(r) (r)->config.tsdbId
...
...
src/tsdb/src/tsdbCompact.c
浏览文件 @
7dc31f63
...
@@ -12,11 +12,516 @@
...
@@ -12,11 +12,516 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "tsdb.h"
#include "tsdb
int
.h"
#ifndef _TSDB_PLUGINS
#ifndef _TSDB_PLUGINS
int
tsdbCompact
(
STsdbRepo
*
pRepo
)
{
return
0
;
}
typedef
struct
{
void
*
tsdbCompactImpl
(
STsdbRepo
*
pRepo
)
{
return
NULL
;
}
STable
*
pTable
;
SBlockIdx
*
pBlkIdx
;
SBlockIdx
bindex
;
SBlockInfo
*
pInfo
;
}
STableCompactH
;
typedef
struct
{
SRtn
rtn
;
SFSIter
fsIter
;
SArray
*
tbArray
;
// table array to cache table obj and block indexes
SReadH
readh
;
SDFileSet
wSet
;
SArray
*
aBlkIdx
;
SArray
*
aSupBlk
;
SDataCols
*
pDataCols
;
}
SCompactH
;
#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet))
#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh))
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
static
int
tsdbAsyncCompact
(
STsdbRepo
*
pRepo
);
static
void
tsdbStartCompact
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndCompact
(
STsdbRepo
*
pRepo
,
int
eno
);
static
int
tsdbCompactMeta
(
STsdbRepo
*
pRepo
);
static
int
tsdbCompactTSData
(
STsdbRepo
*
pRepo
);
static
int
tsdbCompactFSet
(
SCompactH
*
pComph
,
SDFileSet
*
pSet
);
static
bool
tsdbShouldCompact
(
SCompactH
*
pComph
);
static
int
tsdbInitCompactH
(
SCompactH
*
pComph
,
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyCompactH
(
SCompactH
*
pComph
);
static
int
tsdbInitCompTbArray
(
SCompactH
*
pComph
);
static
void
tsdbDestroyCompTbArray
(
SCompactH
*
pComph
);
static
int
tsdbCacheFSetIndex
(
SCompactH
*
pComph
);
static
int
tsdbCompactFSetInit
(
SCompactH
*
pComph
,
SDFileSet
*
pSet
);
static
void
tsdbCompactFSetEnd
(
SCompactH
*
pComph
);
static
int
tsdbCompactFSetImpl
(
SCompactH
*
pComph
);
static
int
tsdbWriteBlockToRightFile
(
SCompactH
*
pComph
,
STable
*
pTable
,
SDataCols
*
pDataCols
,
void
**
ppBuf
,
void
**
ppCBuf
);
int
tsdbCompact
(
STsdbRepo
*
pRepo
)
{
return
tsdbAsyncCompact
(
pRepo
);
}
void
*
tsdbCompactImpl
(
STsdbRepo
*
pRepo
)
{
// Check if there are files in TSDB FS to compact
if
(
REPO_FS
(
pRepo
)
->
cstatus
->
pmf
==
NULL
)
{
tsdbInfo
(
"vgId:%d no file to compact in FS"
,
REPO_ID
(
pRepo
));
return
NULL
;
}
tsdbStartCompact
(
pRepo
);
if
(
tsdbCompactMeta
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to compact META data since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
if
(
tsdbCompactTSData
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to compact TS data since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
tsdbEndCompact
(
pRepo
,
TSDB_CODE_SUCCESS
);
return
NULL
;
_err:
pRepo
->
code
=
terrno
;
tsdbEndCompact
(
pRepo
,
terrno
);
return
NULL
;
}
static
int
tsdbAsyncCompact
(
STsdbRepo
*
pRepo
)
{
tsem_wait
(
&
(
pRepo
->
readyToCommit
));
return
tsdbScheduleCommit
(
pRepo
,
COMPACT_REQ
);
}
static
void
tsdbStartCompact
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
!
pRepo
->
inCompact
);
tsdbInfo
(
"vgId:%d start to compact!"
,
REPO_ID
(
pRepo
));
tsdbStartFSTxn
(
pRepo
,
0
,
0
);
pRepo
->
code
=
TSDB_CODE_SUCCESS
;
pRepo
->
inCompact
=
true
;
}
static
void
tsdbEndCompact
(
STsdbRepo
*
pRepo
,
int
eno
)
{
if
(
eno
!=
TSDB_CODE_SUCCESS
)
{
tsdbEndFSTxnWithError
(
REPO_FS
(
pRepo
));
}
else
{
tsdbEndFSTxn
(
pRepo
);
}
pRepo
->
inCompact
=
false
;
tsdbInfo
(
"vgId:%d compact over, %s"
,
REPO_ID
(
pRepo
),
(
eno
==
TSDB_CODE_SUCCESS
)
?
"succeed"
:
"failed"
);
tsem_post
(
&
(
pRepo
->
readyToCommit
));
}
static
int
tsdbCompactMeta
(
STsdbRepo
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
tsdbUpdateMFile
(
pfs
,
pfs
->
cstatus
->
pmf
);
return
0
;
}
static
int
tsdbCompactTSData
(
STsdbRepo
*
pRepo
)
{
SCompactH
compactH
;
SDFileSet
*
pSet
=
NULL
;
tsdbDebug
(
"vgId:%d start to compact TS data"
,
REPO_ID
(
pRepo
));
// If no file, just return 0;
if
(
taosArrayGetSize
(
REPO_FS
(
pRepo
)
->
cstatus
->
df
)
<=
0
)
{
tsdbDebug
(
"vgId:%d no TS data file to compact, compact over"
,
REPO_ID
(
pRepo
));
return
0
;
}
if
(
tsdbInitCompactH
(
&
compactH
,
pRepo
)
<
0
)
{
return
-
1
;
}
while
((
pSet
=
tsdbFSIterNext
(
&
(
compactH
.
fsIter
))))
{
// Remove those expired files
if
(
pSet
->
fid
<
compactH
.
rtn
.
minFid
)
{
tsdbInfo
(
"vgId:%d FSET %d on level %d disk id %d expires, remove it"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
continue
;
}
if
(
TSDB_FSET_LEVEL
(
pSet
)
==
TFS_MAX_LEVEL
)
{
tsdbDebug
(
"vgId:%d FSET %d on level %d, should not compact"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TFS_MAX_LEVEL
);
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
pSet
);
continue
;
}
if
(
tsdbCompactFSet
(
&
compactH
,
pSet
)
<
0
)
{
tsdbDestroyCompactH
(
&
compactH
);
tsdbError
(
"vgId:%d failed to compact FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
return
-
1
;
}
}
tsdbDestroyCompactH
(
&
compactH
);
tsdbDebug
(
"vgId:%d compact TS data over"
,
REPO_ID
(
pRepo
));
return
0
;
}
static
int
tsdbCompactFSet
(
SCompactH
*
pComph
,
SDFileSet
*
pSet
)
{
STsdbRepo
*
pRepo
=
TSDB_COMPACT_REPO
(
pComph
);
SDiskID
did
;
tsdbDebug
(
"vgId:%d start to compact FSET %d on level %d id %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
if
(
tsdbCompactFSetInit
(
pComph
,
pSet
)
<
0
)
{
return
-
1
;
}
if
(
!
tsdbShouldCompact
(
pComph
))
{
tsdbDebug
(
"vgId:%d no need to compact FSET %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
if
(
tsdbApplyRtnOnFSet
(
TSDB_COMPACT_REPO
(
pComph
),
pSet
,
&
(
pComph
->
rtn
))
<
0
)
{
tsdbCompactFSetEnd
(
pComph
);
return
-
1
;
}
}
else
{
// Create new fset as compacted fset
tfsAllocDisk
(
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
pComph
->
rtn
)),
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
tsdbError
(
"vgId:%d failed to compact FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbCompactFSetEnd
(
pComph
);
return
-
1
;
}
tsdbInitDFileSet
(
TSDB_COMPACT_WSET
(
pComph
),
did
,
REPO_ID
(
pRepo
),
TSDB_FSET_FID
(
pSet
),
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)));
if
(
tsdbCreateDFileSet
(
TSDB_COMPACT_WSET
(
pComph
),
true
)
<
0
)
{
tsdbError
(
"vgId:%d failed to compact FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbCompactFSetEnd
(
pComph
);
return
-
1
;
}
if
(
tsdbCompactFSetImpl
(
pComph
)
<
0
)
{
tsdbCloseDFileSet
(
TSDB_COMPACT_WSET
(
pComph
));
tsdbRemoveDFileSet
(
TSDB_COMPACT_WSET
(
pComph
));
tsdbCompactFSetEnd
(
pComph
);
return
-
1
;
}
tsdbCloseDFileSet
(
TSDB_COMPACT_WSET
(
pComph
));
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
TSDB_COMPACT_WSET
(
pComph
));
tsdbDebug
(
"vgId:%d FSET %d compact over"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
}
tsdbCompactFSetEnd
(
pComph
);
return
0
;
}
static
bool
tsdbShouldCompact
(
SCompactH
*
pComph
)
{
STsdbRepo
*
pRepo
=
TSDB_COMPACT_REPO
(
pComph
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SReadH
*
pReadh
=
&
(
pComph
->
readh
);
STableCompactH
*
pTh
;
SBlock
*
pBlock
;
int
defaultRows
=
TSDB_DEFAULT_BLOCK_ROWS
(
pCfg
->
maxRowsPerFileBlock
);
SDFile
*
pDataF
=
TSDB_READ_DATA_FILE
(
pReadh
);
SDFile
*
pLastF
=
TSDB_READ_LAST_FILE
(
pReadh
);
int
tblocks
=
0
;
// total blocks
int
nSubBlocks
=
0
;
// # of blocks with sub-blocks
int
nSmallBlocks
=
0
;
// # of blocks with rows < defaultRows
int64_t
tsize
=
0
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pComph
->
tbArray
);
i
++
)
{
pTh
=
(
STableCompactH
*
)
taosArrayGet
(
pComph
->
tbArray
,
i
);
if
(
pTh
->
pTable
==
NULL
||
pTh
->
pBlkIdx
==
NULL
)
continue
;
for
(
size_t
bidx
=
0
;
bidx
<
pTh
->
pBlkIdx
->
numOfBlocks
;
bidx
++
)
{
tblocks
++
;
pBlock
=
pTh
->
pInfo
->
blocks
+
bidx
;
if
(
pBlock
->
numOfRows
<
defaultRows
)
{
nSmallBlocks
++
;
}
if
(
pBlock
->
numOfSubBlocks
>
1
)
{
nSubBlocks
++
;
for
(
int
k
=
0
;
k
<
pBlock
->
numOfSubBlocks
;
k
++
)
{
SBlock
*
iBlock
=
((
SBlock
*
)
POINTER_SHIFT
(
pTh
->
pInfo
,
pBlock
->
offset
))
+
k
;
tsize
=
tsize
+
iBlock
->
len
;
}
}
else
if
(
pBlock
->
numOfSubBlocks
==
1
)
{
tsize
+=
pBlock
->
len
;
}
else
{
ASSERT
(
0
);
}
}
}
return
(((
nSubBlocks
*
1
.
0
/
tblocks
)
>
0
.
33
)
||
((
nSmallBlocks
*
1
.
0
/
tblocks
)
>
0
.
33
)
||
(
tsize
*
1
.
0
/
(
pDataF
->
info
.
size
+
pLastF
->
info
.
size
-
2
*
TSDB_FILE_HEAD_SIZE
)
<
0
.
85
));
}
static
int
tsdbInitCompactH
(
SCompactH
*
pComph
,
STsdbRepo
*
pRepo
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
memset
(
pComph
,
0
,
sizeof
(
*
pComph
));
TSDB_FSET_SET_CLOSED
(
TSDB_COMPACT_WSET
(
pComph
));
tsdbGetRtnSnap
(
pRepo
,
&
(
pComph
->
rtn
));
tsdbFSIterInit
(
&
(
pComph
->
fsIter
),
REPO_FS
(
pRepo
),
TSDB_FS_ITER_FORWARD
);
if
(
tsdbInitReadH
(
&
(
pComph
->
readh
),
pRepo
)
<
0
)
{
return
-
1
;
}
if
(
tsdbInitCompTbArray
(
pComph
)
<
0
)
{
tsdbDestroyCompactH
(
pComph
);
return
-
1
;
}
pComph
->
aBlkIdx
=
taosArrayInit
(
1024
,
sizeof
(
SBlockIdx
));
if
(
pComph
->
aBlkIdx
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbDestroyCompactH
(
pComph
);
return
-
1
;
}
pComph
->
aSupBlk
=
taosArrayInit
(
1024
,
sizeof
(
SBlock
));
if
(
pComph
->
aSupBlk
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbDestroyCompactH
(
pComph
);
return
-
1
;
}
pComph
->
pDataCols
=
tdNewDataCols
(
0
,
0
,
pCfg
->
maxRowsPerFileBlock
);
if
(
pComph
->
pDataCols
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbDestroyCompactH
(
pComph
);
return
-
1
;
}
return
0
;
}
static
void
tsdbDestroyCompactH
(
SCompactH
*
pComph
)
{
pComph
->
pDataCols
=
tdFreeDataCols
(
pComph
->
pDataCols
);
pComph
->
aSupBlk
=
taosArrayDestroy
(
pComph
->
aSupBlk
);
pComph
->
aBlkIdx
=
taosArrayDestroy
(
pComph
->
aBlkIdx
);
tsdbDestroyCompTbArray
(
pComph
);
tsdbDestroyReadH
(
&
(
pComph
->
readh
));
tsdbCloseDFileSet
(
TSDB_COMPACT_WSET
(
pComph
));
}
static
int
tsdbInitCompTbArray
(
SCompactH
*
pComph
)
{
// Init pComp->tbArray
STsdbRepo
*
pRepo
=
TSDB_COMPACT_REPO
(
pComph
);
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
if
(
tsdbRLockRepoMeta
(
pRepo
)
<
0
)
return
-
1
;
pComph
->
tbArray
=
taosArrayInit
(
pMeta
->
maxTables
,
sizeof
(
STableCompactH
));
if
(
pComph
->
tbArray
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbUnlockRepoMeta
(
pRepo
);
return
-
1
;
}
// Note here must start from 0
for
(
int
i
=
0
;
i
<
pMeta
->
maxTables
;
i
++
)
{
STableCompactH
ch
=
{
0
};
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
tsdbRefTable
(
pMeta
->
tables
[
i
]);
ch
.
pTable
=
pMeta
->
tables
[
i
];
}
if
(
taosArrayPush
(
pComph
->
tbArray
,
&
ch
)
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbUnlockRepoMeta
(
pRepo
);
return
-
1
;
}
}
if
(
tsdbUnlockRepoMeta
(
pRepo
)
<
0
)
return
-
1
;
return
0
;
}
static
void
tsdbDestroyCompTbArray
(
SCompactH
*
pComph
)
{
STableCompactH
*
pTh
;
if
(
pComph
->
tbArray
==
NULL
)
return
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pComph
->
tbArray
);
i
++
)
{
pTh
=
(
STableCompactH
*
)
taosArrayGet
(
pComph
->
tbArray
,
i
);
if
(
pTh
->
pTable
)
{
tsdbUnRefTable
(
pTh
->
pTable
);
}
pTh
->
pInfo
=
taosTZfree
(
pTh
->
pInfo
);
}
pComph
->
tbArray
=
taosArrayDestroy
(
pComph
->
tbArray
);
}
static
int
tsdbCacheFSetIndex
(
SCompactH
*
pComph
)
{
SReadH
*
pReadH
=
&
(
pComph
->
readh
);
if
(
tsdbLoadBlockIdx
(
pReadH
)
<
0
)
{
return
-
1
;
}
for
(
int
tid
=
1
;
tid
<
taosArrayGetSize
(
pComph
->
tbArray
);
tid
++
)
{
STableCompactH
*
pTh
=
(
STableCompactH
*
)
taosArrayGet
(
pComph
->
tbArray
,
tid
);
pTh
->
pBlkIdx
=
NULL
;
if
(
pTh
->
pTable
==
NULL
)
continue
;
if
(
tsdbSetReadTable
(
pReadH
,
pTh
->
pTable
)
<
0
)
{
return
-
1
;
}
if
(
pReadH
->
pBlkIdx
==
NULL
)
continue
;
pTh
->
bindex
=
*
(
pReadH
->
pBlkIdx
);
pTh
->
pBlkIdx
=
&
(
pTh
->
bindex
);
if
(
tsdbMakeRoom
((
void
**
)(
&
(
pTh
->
pInfo
)),
pTh
->
pBlkIdx
->
len
)
<
0
)
{
return
-
1
;
}
if
(
tsdbLoadBlockInfo
(
pReadH
,
(
void
*
)(
pTh
->
pInfo
))
<
0
)
{
return
-
1
;
}
}
return
0
;
}
static
int
tsdbCompactFSetInit
(
SCompactH
*
pComph
,
SDFileSet
*
pSet
)
{
taosArrayClear
(
pComph
->
aBlkIdx
);
taosArrayClear
(
pComph
->
aSupBlk
);
if
(
tsdbSetAndOpenReadFSet
(
&
(
pComph
->
readh
),
pSet
)
<
0
)
{
return
-
1
;
}
if
(
tsdbCacheFSetIndex
(
pComph
)
<
0
)
{
tsdbCloseAndUnsetFSet
(
&
(
pComph
->
readh
));
return
-
1
;
}
return
0
;
}
static
void
tsdbCompactFSetEnd
(
SCompactH
*
pComph
)
{
tsdbCloseAndUnsetFSet
(
&
(
pComph
->
readh
));
}
static
int
tsdbCompactFSetImpl
(
SCompactH
*
pComph
)
{
STsdbRepo
*
pRepo
=
TSDB_COMPACT_REPO
(
pComph
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SReadH
*
pReadh
=
&
(
pComph
->
readh
);
SBlockIdx
blkIdx
;
void
**
ppBuf
=
&
(
TSDB_COMPACT_BUF
(
pComph
));
void
**
ppCBuf
=
&
(
TSDB_COMPACT_COMP_BUF
(
pComph
));
int
defaultRows
=
TSDB_DEFAULT_BLOCK_ROWS
(
pCfg
->
maxRowsPerFileBlock
);
taosArrayClear
(
pComph
->
aBlkIdx
);
for
(
int
tid
=
1
;
tid
<
taosArrayGetSize
(
pComph
->
tbArray
);
tid
++
)
{
STableCompactH
*
pTh
=
(
STableCompactH
*
)
taosArrayGet
(
pComph
->
tbArray
,
tid
);
STSchema
*
pSchema
;
if
(
pTh
->
pTable
==
NULL
||
pTh
->
pBlkIdx
==
NULL
)
continue
;
pSchema
=
tsdbGetTableSchemaImpl
(
pTh
->
pTable
,
true
,
true
,
-
1
);
taosArrayClear
(
pComph
->
aSupBlk
);
if
((
tdInitDataCols
(
pComph
->
pDataCols
,
pSchema
)
<
0
)
||
(
tdInitDataCols
(
pReadh
->
pDCols
[
0
],
pSchema
)
<
0
)
||
(
tdInitDataCols
(
pReadh
->
pDCols
[
1
],
pSchema
)
<
0
))
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
tdFreeSchema
(
pSchema
);
// Loop to compact each block data
for
(
int
i
=
0
;
i
<
pTh
->
pBlkIdx
->
numOfBlocks
;
i
++
)
{
SBlock
*
pBlock
=
pTh
->
pInfo
->
blocks
+
i
;
// Load the block data
if
(
tsdbLoadBlockData
(
pReadh
,
pBlock
,
pTh
->
pInfo
)
<
0
)
{
return
-
1
;
}
// Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file
if
(
pComph
->
pDataCols
->
numOfRows
==
0
&&
pBlock
->
numOfRows
>=
defaultRows
)
{
if
(
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pReadh
->
pDCols
[
0
],
ppBuf
,
ppCBuf
)
<
0
)
{
return
-
1
;
}
}
else
{
int
ridx
=
0
;
while
(
true
)
{
if
(
pReadh
->
pDCols
[
0
]
->
numOfRows
-
ridx
==
0
)
break
;
int
rowsToMerge
=
MIN
(
pReadh
->
pDCols
[
0
]
->
numOfRows
-
ridx
,
defaultRows
-
pComph
->
pDataCols
->
numOfRows
);
tdMergeDataCols
(
pComph
->
pDataCols
,
pReadh
->
pDCols
[
0
],
rowsToMerge
,
&
ridx
);
if
(
pComph
->
pDataCols
->
numOfRows
<
defaultRows
)
{
break
;
}
if
(
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pComph
->
pDataCols
,
ppBuf
,
ppCBuf
)
<
0
)
{
return
-
1
;
}
tdResetDataCols
(
pComph
->
pDataCols
);
}
}
}
if
(
pComph
->
pDataCols
->
numOfRows
>
0
&&
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pComph
->
pDataCols
,
ppBuf
,
ppCBuf
)
<
0
)
{
return
-
1
;
}
if
(
tsdbWriteBlockInfoImpl
(
TSDB_COMPACT_HEAD_FILE
(
pComph
),
pTh
->
pTable
,
pComph
->
aSupBlk
,
NULL
,
ppBuf
,
&
blkIdx
)
<
0
)
{
return
-
1
;
}
if
((
blkIdx
.
numOfBlocks
>
0
)
&&
(
taosArrayPush
(
pComph
->
aBlkIdx
,
(
void
*
)(
&
blkIdx
))
==
NULL
))
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
}
if
(
tsdbWriteBlockIdx
(
TSDB_COMPACT_HEAD_FILE
(
pComph
),
pComph
->
aBlkIdx
,
ppBuf
)
<
0
)
{
return
-
1
;
}
return
0
;
}
static
int
tsdbWriteBlockToRightFile
(
SCompactH
*
pComph
,
STable
*
pTable
,
SDataCols
*
pDataCols
,
void
**
ppBuf
,
void
**
ppCBuf
)
{
STsdbRepo
*
pRepo
=
TSDB_COMPACT_REPO
(
pComph
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SDFile
*
pDFile
;
bool
isLast
;
SBlock
block
;
ASSERT
(
pDataCols
->
numOfRows
>
0
);
if
(
pDataCols
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
)
{
pDFile
=
TSDB_COMPACT_LAST_FILE
(
pComph
);
isLast
=
true
;
}
else
{
pDFile
=
TSDB_COMPACT_DATA_FILE
(
pComph
);
isLast
=
false
;
}
if
(
tsdbWriteBlockImpl
(
pRepo
,
pTable
,
pDFile
,
pDataCols
,
&
block
,
isLast
,
true
,
ppBuf
,
ppCBuf
)
<
0
)
{
return
-
1
;
}
if
(
taosArrayPush
(
pComph
->
aSupBlk
,
(
void
*
)(
&
block
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
#endif
#endif
\ No newline at end of file
src/tsdb/src/tsdbMain.c
浏览文件 @
7dc31f63
...
@@ -195,6 +195,8 @@ STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; }
...
@@ -195,6 +195,8 @@ STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; }
int
tsdbGetState
(
STsdbRepo
*
repo
)
{
return
repo
->
state
;
}
int
tsdbGetState
(
STsdbRepo
*
repo
)
{
return
repo
->
state
;
}
bool
tsdbInCompact
(
STsdbRepo
*
repo
)
{
return
repo
->
inCompact
;
}
void
tsdbReportStat
(
void
*
repo
,
int64_t
*
totalPoints
,
int64_t
*
totalStorage
,
int64_t
*
compStorage
)
{
void
tsdbReportStat
(
void
*
repo
,
int64_t
*
totalPoints
,
int64_t
*
totalStorage
,
int64_t
*
compStorage
)
{
ASSERT
(
repo
!=
NULL
);
ASSERT
(
repo
!=
NULL
);
STsdbRepo
*
pRepo
=
repo
;
STsdbRepo
*
pRepo
=
repo
;
...
@@ -533,6 +535,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
...
@@ -533,6 +535,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo
->
state
=
TSDB_STATE_OK
;
pRepo
->
state
=
TSDB_STATE_OK
;
pRepo
->
code
=
TSDB_CODE_SUCCESS
;
pRepo
->
code
=
TSDB_CODE_SUCCESS
;
pRepo
->
inCompact
=
false
;
pRepo
->
config
=
*
pCfg
;
pRepo
->
config
=
*
pCfg
;
if
(
pAppH
)
{
if
(
pAppH
)
{
pRepo
->
appH
=
*
pAppH
;
pRepo
->
appH
=
*
pAppH
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录