Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3736e4d7
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
3736e4d7
编写于
10月 18, 2021
作者:
H
Hongze Cheng
提交者:
GitHub
10月 18, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #8141 from taosdata/feature/TS-272
Feature/TS-272: Multi-level top-level pre-aggregate
上级
7f222561
dc106a24
变更
19
显示空白变更内容
内联
并排
Showing
19 changed file
with
891 addition
and
277 deletion
+891
-277
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+1
-0
src/common/src/tglobal.c
src/common/src/tglobal.c
+1
-0
src/dnode/src/dnodeSystem.c
src/dnode/src/dnodeSystem.c
+2
-0
src/inc/taosdef.h
src/inc/taosdef.h
+5
-0
src/inc/taoserror.h
src/inc/taoserror.h
+1
-0
src/tsdb/inc/tsdbCommit.h
src/tsdb/inc/tsdbCommit.h
+2
-2
src/tsdb/inc/tsdbFS.h
src/tsdb/inc/tsdbFS.h
+23
-1
src/tsdb/inc/tsdbFile.h
src/tsdb/inc/tsdbFile.h
+55
-18
src/tsdb/inc/tsdbReadImpl.h
src/tsdb/inc/tsdbReadImpl.h
+130
-6
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+129
-18
src/tsdb/src/tsdbCompact.c
src/tsdb/src/tsdbCompact.c
+21
-14
src/tsdb/src/tsdbFS.c
src/tsdb/src/tsdbFS.c
+191
-100
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+57
-35
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+6
-5
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+9
-16
src/tsdb/src/tsdbReadImpl.c
src/tsdb/src/tsdbReadImpl.c
+246
-32
src/tsdb/src/tsdbSync.c
src/tsdb/src/tsdbSync.c
+7
-4
src/util/src/terror.c
src/util/src/terror.c
+2
-0
tests/script/unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim
...unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim
+3
-26
未找到文件。
src/common/inc/tglobal.h
浏览文件 @
3736e4d7
...
...
@@ -110,6 +110,7 @@ extern int8_t tsCacheLastRow;
//tsdb
extern
bool
tsdbForceKeepFile
;
extern
bool
tsdbForceCompactFile
;
// balance
extern
int8_t
tsEnableBalance
;
...
...
src/common/src/tglobal.c
浏览文件 @
3736e4d7
...
...
@@ -156,6 +156,7 @@ int32_t tsTsdbMetaCompactRatio = TSDB_META_COMPACT_RATIO;
// tsdb config
// For backward compatibility
bool
tsdbForceKeepFile
=
false
;
bool
tsdbForceCompactFile
=
false
;
// compact TSDB fileset forcibly
// balance
int8_t
tsEnableBalance
=
1
;
...
...
src/dnode/src/dnodeSystem.c
浏览文件 @
3736e4d7
...
...
@@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) {
}
}
else
if
(
strcmp
(
argv
[
i
],
"-C"
)
==
0
)
{
dump_config
=
1
;
}
else
if
(
strcmp
(
argv
[
i
],
"--force-compact-file"
)
==
0
)
{
tsdbForceCompactFile
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"--force-keep-file"
)
==
0
)
{
tsdbForceKeepFile
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"--compact-mnode-wal"
)
==
0
)
{
...
...
src/inc/taosdef.h
浏览文件 @
3736e4d7
...
...
@@ -453,6 +453,11 @@ typedef enum {
TD_ROW_PARTIAL_UPDATE
=
2
}
TDUpdateConfig
;
typedef
enum
{
TSDB_STATIS_OK
=
0
,
// statis part exist and load successfully
TSDB_STATIS_NONE
=
1
,
// statis part not exist
}
ETsdbStatisStatus
;
extern
char
*
qtypeStr
[];
#ifdef __cplusplus
...
...
src/inc/taoserror.h
浏览文件 @
3736e4d7
...
...
@@ -271,6 +271,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message")
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value")
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data")
#define TSDB_CODE_TDB_INCOMPLETE_DFILESET TAOS_DEF_ERROR_CODE(0, 0x0617) //"TSDB incomplete DFileSet")
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle")
...
...
src/tsdb/inc/tsdbCommit.h
浏览文件 @
3736e4d7
...
...
@@ -38,8 +38,8 @@ void *tsdbCommitData(STsdbRepo *pRepo);
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
int
tsdbWriteBlockInfoImpl
(
SDFile
*
pHeadf
,
STable
*
pTable
,
SArray
*
pSupA
,
SArray
*
pSubA
,
void
**
ppBuf
,
SBlockIdx
*
pIdx
);
int
tsdbWriteBlockIdx
(
SDFile
*
pHeadf
,
SArray
*
pIdxA
,
void
**
ppBuf
);
int
tsdbWriteBlockImpl
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppC
Buf
);
int
tsdbWriteBlockImpl
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SDFile
*
pDFileAggr
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppCBuf
,
void
**
ppEx
Buf
);
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
int
tsdbGetFidLevel
(
int
fid
,
SRtn
*
pRtn
)
{
...
...
src/tsdb/inc/tsdbFS.h
浏览文件 @
3736e4d7
...
...
@@ -16,7 +16,29 @@
#ifndef _TD_TSDB_FS_H_
#define _TD_TSDB_FS_H_
#define TSDB_FS_VERSION 0
/**
* 1. The fileset .head/.data/.last use the same fver 0 before 2021.10.10.
* 2. .head fver is 1 when extract aggregate block data from .data/.last file and save to separate .smad/.smal file
* since 2021.10.10
* // TODO update date and add release version.
*/
typedef
enum
{
TSDB_FS_VER_0
=
0
,
TSDB_FS_VER_1
,
}
ETsdbFsVer
;
#define TSDB_FVER_TYPE uint32_t
#define TSDB_LATEST_FVER TSDB_FS_VER_1 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_1 // latest version for 'current' file
static
FORCE_INLINE
uint32_t
tsdbGetDFSVersion
(
TSDB_FILE_T
fType
)
{
// latest version for DFile
switch
(
fType
)
{
case
TSDB_FILE_HEAD
:
return
TSDB_FS_VER_1
;
default:
return
TSDB_FS_VER_0
;
}
}
// ================== TSDB global config
extern
bool
tsdbForceKeepFile
;
...
...
src/tsdb/inc/tsdbFile.h
浏览文件 @
3736e4d7
...
...
@@ -37,8 +37,22 @@
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
TSDB_FILE_DATA
,
TSDB_FILE_LAST
,
TSDB_FILE_MAX
,
TSDB_FILE_META
}
TSDB_FILE_T
;
#define ASSERT_TSDB_FSET_NFILES_VALID(s) \
do { \
uint8_t nDFiles = tsdbGetNFiles(s); \
ASSERT((nDFiles >= TSDB_FILE_MIN) && (nDFiles <= TSDB_FILE_MAX)); \
} while (0)
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
TSDB_FILE_DATA
,
TSDB_FILE_LAST
,
TSDB_FILE_SMAD
,
// sma for .data
TSDB_FILE_SMAL
,
// sma for .last
TSDB_FILE_MAX
,
TSDB_FILE_META
}
TSDB_FILE_T
;
#define TSDB_FILE_MIN 3U // min valid number of files in one DFileSet(.head/.data/.last)
// =============== SMFile
typedef
struct
{
...
...
@@ -166,6 +180,7 @@ typedef struct {
uint32_t
offset
;
uint64_t
size
;
uint64_t
tombSize
;
uint32_t
fver
;
}
SDFInfo
;
typedef
struct
{
...
...
@@ -178,8 +193,8 @@ typedef struct {
void
tsdbInitDFile
(
SDFile
*
pDFile
,
SDiskID
did
,
int
vid
,
int
fid
,
uint32_t
ver
,
TSDB_FILE_T
ftype
);
void
tsdbInitDFileEx
(
SDFile
*
pDFile
,
SDFile
*
pODFile
);
int
tsdbEncodeSDFile
(
void
**
buf
,
SDFile
*
pDFile
);
void
*
tsdbDecodeSDFile
(
void
*
buf
,
SDFile
*
pDFile
);
int
tsdbCreateDFile
(
SDFile
*
pDFile
,
bool
updateHeader
);
void
*
tsdbDecodeSDFile
(
void
*
buf
,
SDFile
*
pDFile
,
uint32_t
sfver
);
int
tsdbCreateDFile
(
SDFile
*
pDFile
,
bool
updateHeader
,
TSDB_FILE_T
ftype
);
int
tsdbUpdateDFileHeader
(
SDFile
*
pDFile
);
int
tsdbLoadDFileHeader
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
);
int
tsdbParseDFilename
(
const
char
*
fname
,
int
*
vid
,
int
*
fid
,
TSDB_FILE_T
*
ftype
,
uint32_t
*
version
);
...
...
@@ -285,9 +300,27 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
typedef
struct
{
int
fid
;
int
state
;
uint16_t
ver
;
// fset version
SDFile
files
[
TSDB_FILE_MAX
];
}
SDFileSet
;
typedef
enum
{
TSDB_FSET_VER_0
=
0
,
// .head/.data/.last
TSDB_FSET_VER_1
,
// .head/.data/.last/.smad/.smal
}
ETsdbFSetVer
;
#define TSDB_LATEST_FSET_VER TSDB_FSET_VER_1
// get nDFiles in SDFileSet
static
FORCE_INLINE
uint8_t
tsdbGetNFiles
(
SDFileSet
*
pSet
)
{
switch
(
pSet
->
ver
)
{
case
TSDB_FSET_VER_0
:
return
TSDB_FILE_MIN
;
case
TSDB_FSET_VER_1
:
default:
return
TSDB_FILE_MAX
;
}
}
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
...
...
@@ -300,15 +333,15 @@ typedef struct {
} while (0);
#define TSDB_FSET_FSYNC(s) \
do { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype <
TSDB_FILE_MAX
; ftype++) { \
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype <
tsdbGetNFiles(s)
; ftype++) { \
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
} \
} while (0);
void
tsdbInitDFileSet
(
SDFileSet
*
pSet
,
SDiskID
did
,
int
vid
,
int
fid
,
uint32_t
ver
);
void
tsdbInitDFileSet
(
SDFileSet
*
pSet
,
SDiskID
did
,
int
vid
,
int
fid
,
uint32_t
ver
,
uint16_t
fsetVer
);
void
tsdbInitDFileSetEx
(
SDFileSet
*
pSet
,
SDFileSet
*
pOSet
);
int
tsdbEncodeDFileSet
(
void
**
buf
,
SDFileSet
*
pSet
);
void
*
tsdbDecodeDFileSet
(
void
*
buf
,
SDFileSet
*
pSet
);
void
*
tsdbDecodeDFileSet
(
void
*
buf
,
SDFileSet
*
pSet
,
uint32_t
sfver
);
int
tsdbEncodeDFileSetEx
(
void
**
buf
,
SDFileSet
*
pSet
);
void
*
tsdbDecodeDFileSetEx
(
void
*
buf
,
SDFileSet
*
pSet
);
int
tsdbApplyDFileSetChange
(
SDFileSet
*
from
,
SDFileSet
*
to
);
...
...
@@ -317,13 +350,15 @@ int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int
tsdbScanAndTryFixDFileSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
);
static
FORCE_INLINE
void
tsdbCloseDFileSet
(
SDFileSet
*
pSet
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
ASSERT_TSDB_FSET_NFILES_VALID
(
pSet
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
tsdbCloseDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
static
FORCE_INLINE
int
tsdbOpenDFileSet
(
SDFileSet
*
pSet
,
int
flags
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
ASSERT_TSDB_FSET_NFILES_VALID
(
pSet
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
if
(
tsdbOpenDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
flags
)
<
0
)
{
tsdbCloseDFileSet
(
pSet
);
return
-
1
;
...
...
@@ -333,13 +368,15 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
}
static
FORCE_INLINE
void
tsdbRemoveDFileSet
(
SDFileSet
*
pSet
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
ASSERT_TSDB_FSET_NFILES_VALID
(
pSet
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
(
void
)
tsdbRemoveDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
static
FORCE_INLINE
int
tsdbCopyDFileSet
(
SDFileSet
*
pSrc
,
SDFileSet
*
pDest
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
ASSERT_TSDB_FSET_NFILES_VALID
(
pSrc
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSrc
);
ftype
++
)
{
if
(
tsdbCopyDFile
(
TSDB_DFILE_IN_SET
(
pSrc
,
ftype
),
TSDB_DFILE_IN_SET
(
pDest
,
ftype
))
<
0
)
{
tsdbRemoveDFileSet
(
pDest
);
return
-
1
;
...
...
src/tsdb/inc/tsdbReadImpl.h
浏览文件 @
3736e4d7
...
...
@@ -35,6 +35,7 @@ typedef struct {
TSKEY
maxKey
;
}
SBlockIdx
;
#if 0
typedef struct {
int64_t last : 1;
int64_t offset : 63;
...
...
@@ -46,8 +47,55 @@ typedef struct {
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
TSKEY keyLast;
}
SBlock
;
} SBlock;
#endif
/**
* keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
* numOfCols; // not including timestamp column
*/
#define SBlockFieldsP0 \
int64_t last : 1; \
int64_t offset : 63; \
int32_t algorithm : 8; \
int32_t numOfRows : 24; \
int32_t len; \
int32_t keyLen; \
int16_t numOfSubBlocks; \
int16_t numOfCols; \
TSKEY keyFirst; \
TSKEY keyLast
/**
* aggrStat; // only valid when blkVer > 0. 0 - no aggr part in .data/.last/.smad/.smal, 1 - has aggr in .smad/.smal
* blkVer; // 0 - original block, 1 - block since importing .smad/.smal
* aggrOffset; // only valid when blkVer > 0 and aggrStat > 0
*/
#define SBlockFieldsP1 \
uint64_t aggrStat : 3; \
uint64_t blkVer : 5; \
uint64_t aggrOffset : 56; \
uint32_t aggrLen
typedef
struct
{
SBlockFieldsP0
;
}
SBlockV0
;
typedef
struct
{
SBlockFieldsP0
;
SBlockFieldsP1
;
}
SBlockV1
;
typedef
enum
{
TSDB_SBLK_VER_0
=
0
,
TSDB_SBLK_VER_1
,
}
ESBlockVer
;
#define SBlockVerLatest TSDB_SBLK_VER_1
#define SBlock SBlockV1 // latest SBlock definition
// lastest SBlockInfo definition
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
tid
;
...
...
@@ -68,7 +116,31 @@ typedef struct {
int16_t
numOfNull
;
uint8_t
offsetH
;
char
padding
[
1
];
}
SBlockCol
;
}
SBlockColV0
;
typedef
struct
{
int16_t
colId
;
uint8_t
offsetH
;
uint8_t
reserved
;
// reserved field, not used
int32_t
len
;
uint32_t
type
:
8
;
uint32_t
offset
:
24
;
// char padding[];
}
SBlockColV1
;
#define SBlockCol SBlockColV1 // latest SBlockCol definition
typedef
struct
{
int16_t
colId
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
}
SAggrBlkColV1
;
#define SAggrBlkCol SAggrBlkColV1 // latest SAggrBlkCol definition
// Code here just for back-ward compatibility
static
FORCE_INLINE
void
tsdbSetBlockColOffset
(
SBlockCol
*
pBlockCol
,
uint32_t
offset
)
{
...
...
@@ -88,6 +160,10 @@ typedef struct {
uint64_t
uid
;
// For recovery usage
SBlockCol
cols
[];
}
SBlockData
;
typedef
struct
{
int32_t
numOfCols
;
// For recovery usage
SAggrBlkCol
cols
[];
}
SAggrBlkData
;
struct
SReadH
{
STsdbRepo
*
pRepo
;
...
...
@@ -96,11 +172,13 @@ struct SReadH {
STable
*
pTable
;
// table to read
SBlockIdx
*
pBlkIdx
;
// current reading table SBlockIdx
int
cidx
;
SBlockInfo
*
pBlkInfo
;
SBlockInfo
*
pBlkInfo
;
// SBlockInfoV#
SBlockData
*
pBlkData
;
// Block info
SAggrBlkData
*
pAggrBlkData
;
// Aggregate Block info
SDataCols
*
pDCols
[
2
];
void
*
pBuf
;
// buffer
void
*
pCBuf
;
// compression buffer
void
*
pExBuf
;
// extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
...
...
@@ -110,10 +188,38 @@ struct SReadH {
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static
FORCE_INLINE
size_t
tsdbBlockStatisSize
(
int
nCols
,
uint32_t
blkVer
)
{
switch
(
blkVer
)
{
case
TSDB_SBLK_VER_0
:
return
TSDB_BLOCK_STATIS_SIZE
(
nCols
,
0
);
case
TSDB_SBLK_VER_1
:
default:
return
TSDB_BLOCK_STATIS_SIZE
(
nCols
,
1
);
}
}
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) \
(sizeof(SAggrBlkData) + sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static
FORCE_INLINE
size_t
tsdbBlockAggrSize
(
int
nCols
,
uint32_t
blkVer
)
{
switch
(
blkVer
)
{
case
TSDB_SBLK_VER_0
:
ASSERT
(
false
);
return
0
;
case
TSDB_SBLK_VER_1
:
default:
return
TSDB_BLOCK_AGGR_SIZE
(
nCols
,
1
);
}
}
int
tsdbInitReadH
(
SReadH
*
pReadh
,
STsdbRepo
*
pRepo
);
void
tsdbDestroyReadH
(
SReadH
*
pReadh
);
...
...
@@ -121,13 +227,14 @@ int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
void
tsdbCloseAndUnsetFSet
(
SReadH
*
pReadh
);
int
tsdbLoadBlockIdx
(
SReadH
*
pReadh
);
int
tsdbSetReadTable
(
SReadH
*
pReadh
,
STable
*
pTable
);
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
pTarget
);
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
*
pTarget
,
uint32_t
*
extendedLen
);
int
tsdbLoadBlockData
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlockInfo
);
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
int16_t
*
colIds
,
int
numOfColsIds
);
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbLoadBlockOffset
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
);
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
,
SBlock
*
pBlock
);
static
FORCE_INLINE
int
tsdbMakeRoom
(
void
**
ppBuf
,
size_t
size
)
{
void
*
pBuf
=
*
ppBuf
;
...
...
@@ -150,4 +257,21 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return
0
;
}
static
FORCE_INLINE
SBlockCol
*
tsdbGetSBlockCol
(
SBlock
*
pBlock
,
SBlockCol
**
pDestBlkCol
,
SBlockCol
*
pBlkCols
,
int
colIdx
)
{
if
(
pBlock
->
blkVer
==
SBlockVerLatest
)
{
*
pDestBlkCol
=
pBlkCols
+
colIdx
;
return
*
pDestBlkCol
;
}
if
(
pBlock
->
blkVer
==
TSDB_SBLK_VER_0
)
{
SBlockColV0
*
pBlkCol
=
(
SBlockColV0
*
)
pBlkCols
+
colIdx
;
(
*
pDestBlkCol
)
->
colId
=
pBlkCol
->
colId
;
(
*
pDestBlkCol
)
->
len
=
pBlkCol
->
len
;
(
*
pDestBlkCol
)
->
type
=
pBlkCol
->
type
;
(
*
pDestBlkCol
)
->
offset
=
pBlkCol
->
offset
;
(
*
pDestBlkCol
)
->
offsetH
=
pBlkCol
->
offsetH
;
}
return
*
pDestBlkCol
;
}
#endif
/*_TD_TSDB_READ_IMPL_H_*/
src/tsdb/src/tsdbCommit.c
浏览文件 @
3736e4d7
...
...
@@ -51,8 +51,11 @@ typedef struct {
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
...
...
@@ -136,7 +139,7 @@ int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
if
(
did
.
level
>
TSDB_FSET_LEVEL
(
pSet
))
{
// Need to move the FSET to higher level
tsdbInitDFileSet
(
&
nSet
,
did
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
FS_TXN_VERSION
(
pfs
));
tsdbInitDFileSet
(
&
nSet
,
did
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
FS_TXN_VERSION
(
pfs
)
,
pSet
->
ver
);
if
(
tsdbCopyDFileSet
(
pSet
,
&
nSet
)
<
0
)
{
tsdbError
(
"vgId:%d failed to copy FSET %d from level %d to level %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
...
...
@@ -912,7 +915,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) {
}
else
{
int
tfid
=
(
int
)(
TSDB_KEY_FID
(
nextKey
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
if
(
fid
==
TSDB_IVLD_FID
||
fid
>
tfid
)
{
fid
=
tfid
;
fid
=
tfid
;
// find the least fid
}
}
}
...
...
@@ -946,7 +949,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
SBlock
*
pBlock
;
if
(
pCommith
->
readh
.
pBlkIdx
)
{
if
(
tsdbLoadBlockInfo
(
&
(
pCommith
->
readh
),
NULL
)
<
0
)
{
if
(
tsdbLoadBlockInfo
(
&
(
pCommith
->
readh
),
NULL
,
NULL
)
<
0
)
{
TSDB_RUNLOCK_TABLE
(
pIter
->
pTable
);
return
-
1
;
}
...
...
@@ -1053,40 +1056,57 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
int
tsdbWriteBlockImpl
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SD
ataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppC
Buf
)
{
int
tsdbWriteBlockImpl
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SD
File
*
pDFileAggr
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppCBuf
,
void
**
ppEx
Buf
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SBlockData
*
pBlockData
;
int64_t
offset
=
0
;
SAggrBlkData
*
pAggrBlkData
=
NULL
;
int64_t
offset
=
0
,
offsetAggr
=
0
;
int
rowsToWrite
=
pDataCols
->
numOfRows
;
ASSERT
(
rowsToWrite
>
0
&&
rowsToWrite
<=
pCfg
->
maxRowsPerFileBlock
);
ASSERT
((
!
isLast
)
||
rowsToWrite
<
pCfg
->
minRowsPerFileBlock
);
// Make buffer space
if
(
tsdbMakeRoom
(
ppBuf
,
TSDB_BLOCK_STATIS_SIZE
(
pDataCols
->
numOfCols
))
<
0
)
{
if
(
tsdbMakeRoom
(
ppBuf
,
tsdbBlockStatisSize
(
pDataCols
->
numOfCols
,
SBlockVerLatest
))
<
0
)
{
return
-
1
;
}
pBlockData
=
(
SBlockData
*
)(
*
ppBuf
);
if
(
tsdbMakeRoom
(
ppExBuf
,
tsdbBlockAggrSize
(
pDataCols
->
numOfCols
,
SBlockVerLatest
))
<
0
)
{
return
-
1
;
}
pAggrBlkData
=
(
SAggrBlkData
*
)(
*
ppExBuf
);
// Get # of cols not all NULL(not including key column)
int
nColsNotAllNull
=
0
;
int
nAggrCols
=
0
;
for
(
int
ncol
=
1
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
// ncol from 1, we skip the timestamp column
SDataCol
*
pDataCol
=
pDataCols
->
cols
+
ncol
;
SBlockCol
*
pBlockCol
=
pBlockData
->
cols
+
nColsNotAllNull
;
SBlockCol
*
pBlockCol
=
pBlockData
->
cols
+
nColsNotAllNull
;
SAggrBlkCol
*
pAggrBlkCol
=
pAggrBlkData
->
cols
+
nColsNotAllNull
;
if
(
isAllRowsNull
(
pDataCol
))
{
// all data to commit are NULL, just ignore it
continue
;
}
memset
(
pBlockCol
,
0
,
sizeof
(
*
pBlockCol
));
memset
(
pAggrBlkCol
,
0
,
sizeof
(
*
pAggrBlkCol
));
pBlockCol
->
colId
=
pDataCol
->
colId
;
pBlockCol
->
type
=
pDataCol
->
type
;
pAggrBlkCol
->
colId
=
pDataCol
->
colId
;
if
(
tDataTypes
[
pDataCol
->
type
].
statisFunc
)
{
#if 0
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
#endif
(
*
tDataTypes
[
pDataCol
->
type
].
statisFunc
)(
pDataCol
->
pData
,
rowsToWrite
,
&
(
pAggrBlkCol
->
min
),
&
(
pAggrBlkCol
->
max
),
&
(
pAggrBlkCol
->
sum
),
&
(
pAggrBlkCol
->
minIndex
),
&
(
pAggrBlkCol
->
maxIndex
),
&
(
pAggrBlkCol
->
numOfNull
));
++
nAggrCols
;
}
nColsNotAllNull
++
;
}
...
...
@@ -1096,9 +1116,12 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
// Compress the data if neccessary
int
tcol
=
0
;
// counter of not all NULL and written columns
uint32_t
toffset
=
0
;
int32_t
tsize
=
TSDB_BLOCK_STATIS_SIZE
(
nColsNotAllNull
);
int32_t
tsize
=
(
int32_t
)
tsdbBlockStatisSize
(
nColsNotAllNull
,
SBlockVerLatest
);
int32_t
lsize
=
tsize
;
int32_t
keyLen
=
0
;
uint32_t
tsizeAggr
=
(
uint32_t
)
tsdbBlockAggrSize
(
nColsNotAllNull
,
SBlockVerLatest
);
for
(
int
ncol
=
0
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
// All not NULL columns finish
if
(
ncol
!=
0
&&
tcol
>=
nColsNotAllNull
)
break
;
...
...
@@ -1165,7 +1188,20 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
return
-
1
;
}
// Update pBlock membership vairables
uint32_t
aggrStatus
=
((
nAggrCols
>
0
)
&&
(
rowsToWrite
>
8
))
?
1
:
0
;
// TODO: How to make the decision?
if
(
aggrStatus
>
0
)
{
pAggrBlkData
->
numOfCols
=
nColsNotAllNull
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pAggrBlkData
,
tsizeAggr
);
tsdbUpdateDFileMagic
(
pDFileAggr
,
POINTER_SHIFT
(
pAggrBlkData
,
tsizeAggr
-
sizeof
(
TSCKSUM
)));
// Write the whole block to file
if
(
tsdbAppendDFile
(
pDFileAggr
,
(
void
*
)
pAggrBlkData
,
tsizeAggr
,
&
offsetAggr
)
<
tsizeAggr
)
{
return
-
1
;
}
}
// Update pBlock membership variables
pBlock
->
last
=
isLast
;
pBlock
->
offset
=
offset
;
pBlock
->
algorithm
=
pCfg
->
compression
;
...
...
@@ -1176,6 +1212,11 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
pBlock
->
numOfCols
=
nColsNotAllNull
;
pBlock
->
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
pBlock
->
keyLast
=
dataColsKeyLast
(
pDataCols
);
// since blkVer1
pBlock
->
aggrStat
=
aggrStatus
;
pBlock
->
blkVer
=
SBlockVerLatest
;
pBlock
->
aggrOffset
=
(
uint64_t
)
offsetAggr
;
pBlock
->
aggrLen
=
tsizeAggr
;
tsdbDebug
(
"vgId:%d tid:%d a block of data is written to file %s, offset %"
PRId64
" numOfRows %d len %d numOfCols %"
PRId16
" keyFirst %"
PRId64
" keyLast %"
PRId64
,
...
...
@@ -1187,12 +1228,12 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
static
int
tsdbWriteBlock
(
SCommitH
*
pCommith
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
)
{
return
tsdbWriteBlockImpl
(
TSDB_COMMIT_REPO
(
pCommith
),
TSDB_COMMIT_TABLE
(
pCommith
),
pDFile
,
pDataCols
,
pBlock
,
isLast
,
isSuper
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))),
(
void
**
)(
&
(
TSDB_COMMIT_COMP_BUF
(
pCommith
))));
return
tsdbWriteBlockImpl
(
TSDB_COMMIT_REPO
(
pCommith
),
TSDB_COMMIT_TABLE
(
pCommith
),
pDFile
,
isLast
?
TSDB_COMMIT_SMAL_FILE
(
pCommith
)
:
TSDB_COMMIT_SMAD_FILE
(
pCommith
),
pDataCols
,
pBlock
,
isLast
,
isSuper
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))),
(
void
**
)(
&
(
TSDB_COMMIT_COMP_BUF
(
pCommith
))),
(
void
**
)(
&
(
TSDB_COMMIT_EXBUF
(
pCommith
))));
}
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
)
{
SDFile
*
pHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommih
);
SBlockIdx
blkIdx
;
...
...
@@ -1521,7 +1562,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// Set and open commit FSET
if
(
pSet
==
NULL
||
did
.
level
>
TSDB_FSET_LEVEL
(
pSet
))
{
// Create a new FSET to write data
tsdbInitDFileSet
(
pWSet
,
did
,
REPO_ID
(
pRepo
),
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)));
tsdbInitDFileSet
(
pWSet
,
did
,
REPO_ID
(
pRepo
),
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
))
,
TSDB_LATEST_FSET_VER
);
if
(
tsdbCreateDFileSet
(
pWSet
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create FSET %d at level %d disk id %d since %s"
,
REPO_ID
(
pRepo
),
...
...
@@ -1543,11 +1584,12 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
pCommith
->
wSet
.
fid
=
fid
;
pCommith
->
wSet
.
state
=
0
;
pCommith
->
wSet
.
ver
=
TSDB_LATEST_FSET_VER
;
// TSDB_FILE_HEAD
SDFile
*
pWHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommith
);
tsdbInitDFile
(
pWHeadf
,
did
,
REPO_ID
(
pRepo
),
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_HEAD
);
if
(
tsdbCreateDFile
(
pWHeadf
,
true
)
<
0
)
{
if
(
tsdbCreateDFile
(
pWHeadf
,
true
,
TSDB_FILE_HEAD
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWHeadf
),
tstrerror
(
terrno
));
...
...
@@ -1596,7 +1638,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFile
(
pWLastf
,
did
,
REPO_ID
(
pRepo
),
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_LAST
);
pCommith
->
isLFileSame
=
false
;
if
(
tsdbCreateDFile
(
pWLastf
,
true
)
<
0
)
{
if
(
tsdbCreateDFile
(
pWLastf
,
true
,
TSDB_FILE_LAST
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWLastf
),
tstrerror
(
terrno
));
...
...
@@ -1608,6 +1650,75 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
}
// TSDB_FILE_SMAD
SDFile
*
pRSmadF
=
TSDB_READ_SMAD_FILE
(
&
(
pCommith
->
readh
));
SDFile
*
pWSmadF
=
TSDB_COMMIT_SMAD_FILE
(
pCommith
);
if
(
access
(
TSDB_FILE_FULL_NAME
(
pRSmadF
),
F_OK
)
!=
0
)
{
tsdbDebug
(
"vgId:%d create data file %s as not exist"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pRSmadF
));
tsdbInitDFile
(
pWSmadF
,
did
,
REPO_ID
(
pRepo
),
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_SMAD
);
if
(
tsdbCreateDFile
(
pWSmadF
,
true
,
TSDB_FILE_SMAD
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmadF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
(
void
)
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
else
{
tsdbInitDFileEx
(
pWSmadF
,
pRSmadF
);
if
(
tsdbOpenDFile
(
pWSmadF
,
O_RDWR
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmadF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
// TSDB_FILE_SMAL
ASSERT
(
tsdbGetNFiles
(
pWSet
)
>=
TSDB_FILE_SMAL
);
SDFile
*
pRSmalF
=
TSDB_READ_SMAL_FILE
(
&
(
pCommith
->
readh
));
SDFile
*
pWSmalF
=
TSDB_COMMIT_SMAL_FILE
(
pCommith
);
if
((
pCommith
->
isLFileSame
)
&&
access
(
TSDB_FILE_FULL_NAME
(
pRSmalF
),
F_OK
)
==
0
)
{
tsdbInitDFileEx
(
pWSmalF
,
pRSmalF
);
if
(
tsdbOpenDFile
(
pWSmalF
,
O_RDWR
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmalF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
else
{
tsdbDebug
(
"vgId:%d create data file %s as not exist"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pRSmalF
));
tsdbInitDFile
(
pWSmalF
,
did
,
REPO_ID
(
pRepo
),
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_SMAL
);
if
(
tsdbCreateDFile
(
pWSmalF
,
true
,
TSDB_FILE_SMAL
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmalF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
(
void
)
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
}
return
0
;
...
...
src/tsdb/src/tsdbCompact.c
浏览文件 @
3736e4d7
...
...
@@ -37,8 +37,11 @@ typedef struct {
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
#define TSDB_COMPACT_EXBUF(pComph) TSDB_READ_EXBUF(&((pComph)->readh))
static
int
tsdbAsyncCompact
(
STsdbRepo
*
pRepo
);
static
void
tsdbStartCompact
(
STsdbRepo
*
pRepo
);
...
...
@@ -56,7 +59,7 @@ static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet);
static
void
tsdbCompactFSetEnd
(
SCompactH
*
pComph
);
static
int
tsdbCompactFSetImpl
(
SCompactH
*
pComph
);
static
int
tsdbWriteBlockToRightFile
(
SCompactH
*
pComph
,
STable
*
pTable
,
SDataCols
*
pDataCols
,
void
**
ppBuf
,
void
**
ppCBuf
);
void
**
ppCBuf
,
void
**
ppExBuf
);
enum
{
TSDB_NO_COMPACT
,
TSDB_IN_COMPACT
,
TSDB_WAITING_COMPACT
};
int
tsdbCompact
(
STsdbRepo
*
pRepo
)
{
return
tsdbAsyncCompact
(
pRepo
);
}
...
...
@@ -194,7 +197,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
tsdbInitDFileSet
(
TSDB_COMPACT_WSET
(
pComph
),
did
,
REPO_ID
(
pRepo
),
TSDB_FSET_FID
(
pSet
),
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)));
FS_TXN_VERSION
(
REPO_FS
(
pRepo
))
,
TSDB_LATEST_FSET_VER
);
if
(
tsdbCreateDFileSet
(
TSDB_COMPACT_WSET
(
pComph
),
true
)
<
0
)
{
tsdbError
(
"vgId:%d failed to compact FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbCompactFSetEnd
(
pComph
);
...
...
@@ -218,6 +221,9 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
static
bool
tsdbShouldCompact
(
SCompactH
*
pComph
)
{
if
(
tsdbForceCompactFile
)
{
return
true
;
}
STsdbRepo
*
pRepo
=
TSDB_COMPACT_REPO
(
pComph
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SReadH
*
pReadh
=
&
(
pComph
->
readh
);
...
...
@@ -358,7 +364,8 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
tsdbUnRefTable
(
pTh
->
pTable
);
}
pTh
->
pInfo
=
taosTZfree
(
pTh
->
pInfo
);
// pTh->pInfo = taosTZfree(pTh->pInfo);
tfree
(
pTh
->
pInfo
);
}
pComph
->
tbArray
=
taosArrayDestroy
(
pComph
->
tbArray
);
...
...
@@ -384,11 +391,8 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
pTh
->
bindex
=
*
(
pReadH
->
pBlkIdx
);
pTh
->
pBlkIdx
=
&
(
pTh
->
bindex
);
if
(
tsdbMakeRoom
((
void
**
)(
&
(
pTh
->
pInfo
)),
pTh
->
pBlkIdx
->
len
)
<
0
)
{
return
-
1
;
}
if
(
tsdbLoadBlockInfo
(
pReadH
,
(
void
*
)(
pTh
->
pInfo
))
<
0
)
{
uint32_t
originLen
=
0
;
if
(
tsdbLoadBlockInfo
(
pReadH
,
(
void
**
)(
&
(
pTh
->
pInfo
)),
&
originLen
)
<
0
)
{
return
-
1
;
}
}
...
...
@@ -421,6 +425,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
SBlockIdx
blkIdx
;
void
**
ppBuf
=
&
(
TSDB_COMPACT_BUF
(
pComph
));
void
**
ppCBuf
=
&
(
TSDB_COMPACT_COMP_BUF
(
pComph
));
void
**
ppExBuf
=
&
(
TSDB_COMPACT_EXBUF
(
pComph
));
int
defaultRows
=
TSDB_DEFAULT_BLOCK_ROWS
(
pCfg
->
maxRowsPerFileBlock
);
taosArrayClear
(
pComph
->
aBlkIdx
);
...
...
@@ -451,7 +456,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
// Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file
if
(
pComph
->
pDataCols
->
numOfRows
==
0
&&
pBlock
->
numOfRows
>=
defaultRows
)
{
if
(
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pReadh
->
pDCols
[
0
],
ppBuf
,
ppCBuf
)
<
0
)
{
if
(
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pReadh
->
pDCols
[
0
],
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
return
-
1
;
}
}
else
{
...
...
@@ -467,7 +472,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
break
;
}
if
(
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pComph
->
pDataCols
,
ppBuf
,
ppCBuf
)
<
0
)
{
if
(
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pComph
->
pDataCols
,
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
return
-
1
;
}
tdResetDataCols
(
pComph
->
pDataCols
);
...
...
@@ -476,7 +481,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
if
(
pComph
->
pDataCols
->
numOfRows
>
0
&&
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pComph
->
pDataCols
,
ppBuf
,
ppCBuf
)
<
0
)
{
tsdbWriteBlockToRightFile
(
pComph
,
pTh
->
pTable
,
pComph
->
pDataCols
,
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
return
-
1
;
}
...
...
@@ -499,7 +504,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
static
int
tsdbWriteBlockToRightFile
(
SCompactH
*
pComph
,
STable
*
pTable
,
SDataCols
*
pDataCols
,
void
**
ppBuf
,
void
**
ppC
Buf
)
{
void
**
ppCBuf
,
void
**
ppEx
Buf
)
{
STsdbRepo
*
pRepo
=
TSDB_COMPACT_REPO
(
pComph
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SDFile
*
pDFile
;
...
...
@@ -516,7 +521,9 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
isLast
=
false
;
}
if
(
tsdbWriteBlockImpl
(
pRepo
,
pTable
,
pDFile
,
pDataCols
,
&
block
,
isLast
,
true
,
ppBuf
,
ppCBuf
)
<
0
)
{
if
(
tsdbWriteBlockImpl
(
pRepo
,
pTable
,
pDFile
,
isLast
?
TSDB_COMPACT_SMAL_FILE
(
pComph
)
:
TSDB_COMPACT_SMAD_FILE
(
pComph
),
pDataCols
,
&
block
,
isLast
,
true
,
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
return
-
1
;
}
...
...
@@ -526,5 +533,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
return
0
;
}
}
src/tsdb/src/tsdbFS.c
浏览文件 @
3736e4d7
...
...
@@ -36,6 +36,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2);
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
,
int32_t
*
nExpired
);
static
int
tsdbProcessExpiredFS
(
STsdbRepo
*
pRepo
);
static
int
tsdbCreateMeta
(
STsdbRepo
*
pRepo
);
static
int
tsdbFetchTFileSet
(
STsdbRepo
*
pRepo
,
SArray
**
fArray
);
// For backward compatibility
// ================== CURRENT file header info
...
...
@@ -89,18 +90,33 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
return
tlen
;
}
static
void
*
tsdbDecodeDFileSetArray
(
void
*
buf
,
SArray
*
pArray
)
{
static
int
tsdbDecodeDFileSetArray
(
void
**
originBuf
,
void
*
buf
,
SArray
*
pArray
,
SFSHeader
*
pSFSHeader
)
{
uint64_t
nset
;
SDFileSet
dset
;
dset
.
ver
=
TSDB_FSET_VER_0
;
// default value
taosArrayClear
(
pArray
);
buf
=
taosDecodeFixedU64
(
buf
,
&
nset
);
if
(
pSFSHeader
->
version
==
TSDB_FS_VER_0
)
{
// record fver in new version of 'current' file
uint64_t
extendedSize
=
pSFSHeader
->
len
+
nset
*
TSDB_FILE_MAX
*
sizeof
(
TSDB_FVER_TYPE
);
if
(
taosTSizeof
(
*
originBuf
)
<
extendedSize
)
{
size_t
ptrDistance
=
POINTER_DISTANCE
(
buf
,
*
originBuf
);
if
(
tsdbMakeRoom
(
originBuf
,
(
size_t
)
extendedSize
)
<
0
)
{
terrno
=
TSDB_CODE_FS_OUT_OF_MEMORY
;
return
-
1
;
}
buf
=
POINTER_SHIFT
(
*
originBuf
,
ptrDistance
);
}
}
for
(
size_t
i
=
0
;
i
<
nset
;
i
++
)
{
buf
=
tsdbDecodeDFileSet
(
buf
,
&
dset
);
buf
=
tsdbDecodeDFileSet
(
buf
,
&
dset
,
pSFSHeader
->
version
);
taosArrayPush
(
pArray
,
(
void
*
)(
&
dset
));
}
return
buf
;
return
TSDB_CODE_SUCCESS
;
}
static
int
tsdbEncodeFSStatus
(
void
**
buf
,
SFSStatus
*
pStatus
)
{
...
...
@@ -114,15 +130,12 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return
tlen
;
}
static
void
*
tsdbDecodeFSStatus
(
void
*
buf
,
SFSStatus
*
pStatus
)
{
static
int
tsdbDecodeFSStatus
(
void
**
originBuf
,
void
*
buf
,
SFSStatus
*
pStatus
,
SFSHeader
*
pSFSHeader
)
{
tsdbResetFSStatus
(
pStatus
);
pStatus
->
pmf
=
&
(
pStatus
->
mf
);
buf
=
tsdbDecodeSMFile
(
buf
,
pStatus
->
pmf
);
buf
=
tsdbDecodeDFileSetArray
(
buf
,
pStatus
->
df
);
return
buf
;
return
tsdbDecodeDFileSetArray
(
originBuf
,
buf
,
pStatus
->
df
,
pSFSHeader
);
}
static
SFSStatus
*
tsdbNewFSStatus
(
int
maxFSet
)
{
...
...
@@ -414,7 +427,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
return
-
1
;
}
fsheader
.
version
=
TSDB_
FS_VERSION
;
fsheader
.
version
=
TSDB_
LATEST_SFS_VER
;
if
(
pStatus
->
pmf
==
NULL
)
{
ASSERT
(
taosArrayGetSize
(
pStatus
->
df
)
==
0
);
fsheader
.
len
=
0
;
...
...
@@ -689,7 +702,7 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
ptr
=
tsdbDecodeFSHeader
(
ptr
,
&
fsheader
);
ptr
=
tsdbDecodeFSMeta
(
ptr
,
&
(
pStatus
->
meta
));
if
(
fsheader
.
version
!=
TSDB_FS_VER
SION
)
{
if
(
fsheader
.
version
!=
TSDB_FS_VER
_0
)
{
// TODO: handle file version change
}
...
...
@@ -718,7 +731,9 @@ static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
}
ptr
=
buffer
;
ptr
=
tsdbDecodeFSStatus
(
ptr
,
pStatus
);
if
(
tsdbDecodeFSStatus
(
&
buffer
,
ptr
,
pStatus
,
&
fsheader
)
<
0
)
{
goto
_err
;
}
}
else
{
tsdbResetFSStatus
(
pStatus
);
}
...
...
@@ -752,7 +767,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
SDFileSet
*
pSet
=
(
SDFileSet
*
)
taosArrayGet
(
pStatus
->
df
,
i
);
if
(
tsdbScanAndTryFixDFileSet
(
pRepo
,
pSet
)
<
0
)
{
tsdbError
(
"vgId:%d failed to fix
MFile
since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to fix
DFileSet
since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
}
...
...
@@ -966,7 +981,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
SDFileSet
*
pSet
;
while
((
pSet
=
tsdbFSIterNext
(
&
fsiter
)))
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
)
;
ftype
++
)
{
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
pSet
,
ftype
);
if
(
tfsIsSameFile
(
pf
,
TSDB_FILE_F
(
pDFile
)))
{
return
true
;
...
...
@@ -1098,25 +1113,23 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) {
return
0
;
}
static
int
tsdb
RestoreDFileSet
(
STsdbRepo
*
pRepo
)
{
static
int
tsdb
FetchTFileSet
(
STsdbRepo
*
pRepo
,
SArray
**
fArray
)
{
char
dataDir
[
TSDB_FILENAME_LEN
];
char
bname
[
TSDB_FILENAME_LEN
];
TDIR
*
tdir
=
NULL
;
const
TFILE
*
pf
=
NULL
;
const
char
*
pattern
=
"^v[0-9]+f[0-9]+
\\
.(head|data|last)(-ver[0-9]+)?$"
;
SArray
*
fArray
=
NULL
;
const
char
*
pattern
=
"^v[0-9]+f[0-9]+
\\
.(head|data|last|smad|smal)(-ver[0-9]+)?$"
;
regex_t
regex
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
dataDir
);
// Resource allocation and init
regcomp
(
&
regex
,
pattern
,
REG_EXTENDED
);
fArray
=
taosArrayInit
(
1024
,
sizeof
(
TFILE
));
if
(
fArray
==
NULL
)
{
*
fArray
=
taosArrayInit
(
1024
,
sizeof
(
TFILE
));
if
(
*
fArray
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbError
(
"vgId:%d failed to
restore D
FileSet while open directory %s since %s"
,
REPO_ID
(
pRepo
),
dataDir
,
tsdbError
(
"vgId:%d failed to
fetch T
FileSet while open directory %s since %s"
,
REPO_ID
(
pRepo
),
dataDir
,
tstrerror
(
terrno
));
regfree
(
&
regex
);
return
-
1
;
...
...
@@ -1124,9 +1137,9 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
tdir
=
tfsOpendir
(
dataDir
);
if
(
tdir
==
NULL
)
{
tsdbError
(
"vgId:%d failed to
restore D
FileSet while open directory %s since %s"
,
REPO_ID
(
pRepo
),
dataDir
,
tsdbError
(
"vgId:%d failed to
fetch T
FileSet while open directory %s since %s"
,
REPO_ID
(
pRepo
),
dataDir
,
tstrerror
(
terrno
));
taosArrayDestroy
(
fArray
);
taosArrayDestroy
(
*
fArray
);
regfree
(
&
regex
);
return
-
1
;
}
...
...
@@ -1136,10 +1149,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
int
code
=
regexec
(
&
regex
,
bname
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
if
(
taosArrayPush
(
fArray
,
(
void
*
)
pf
)
==
NULL
)
{
if
(
taosArrayPush
(
*
fArray
,
(
void
*
)
pf
)
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tfsClosedir
(
tdir
);
taosArrayDestroy
(
fArray
);
taosArrayDestroy
(
*
fArray
);
regfree
(
&
regex
);
return
-
1
;
}
...
...
@@ -1150,10 +1163,10 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
continue
;
}
else
{
// Has other error
tsdbError
(
"vgId:%d failed to
restore D
FileSet Array while run regexec since %s"
,
REPO_ID
(
pRepo
),
strerror
(
code
));
tsdbError
(
"vgId:%d failed to
fetch T
FileSet Array while run regexec since %s"
,
REPO_ID
(
pRepo
),
strerror
(
code
));
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
tfsClosedir
(
tdir
);
taosArrayDestroy
(
fArray
);
taosArrayDestroy
(
*
fArray
);
regfree
(
&
regex
);
return
-
1
;
}
...
...
@@ -1163,72 +1176,129 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
regfree
(
&
regex
);
// Sort the array according to file name
taosArraySort
(
fArray
,
tsdbComparTFILE
);
taosArraySort
(
*
fArray
,
tsdbComparTFILE
);
return
0
;
}
size_t
index
=
0
;
// Loop to recover each file set
for
(;;)
{
if
(
index
>=
taosArrayGetSize
(
fArray
))
{
break
;
// update the function if the DFileSet definition updates
static
bool
tsdbIsDFileSetValid
(
int
nFiles
)
{
switch
(
nFiles
)
{
case
TSDB_FILE_MIN
:
case
TSDB_FILE_MAX
:
return
true
;
default:
return
false
;
}
}
SDFileSet
fset
=
{
0
};
TSDB_FSET_SET_CLOSED
(
&
fset
);
static
int
tsdbRestoreDFileSet
(
STsdbRepo
*
pRepo
)
{
const
TFILE
*
pf
=
NULL
;
SArray
*
fArray
=
NULL
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
char
dataDir
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
size_t
fArraySize
=
0
;
// Loop to recover ONE fset
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
&
fset
,
ftype
);
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
dataDir
);
if
(
index
>=
taosArrayGetSize
(
fArray
)
)
{
tsdbError
(
"vgId:%d incomplete DFileSet, fid:%d"
,
REPO_ID
(
pRepo
),
fset
.
fid
);
taosArrayDestroy
(
fArray
);
if
(
tsdbFetchTFileSet
(
pRepo
,
&
fArray
)
<
0
)
{
tsdbError
(
"vgId:%d failed to fetch TFileSet from %s to restore since %s"
,
REPO_ID
(
pRepo
),
dataDir
,
tstrerror
(
terrno
)
);
return
-
1
;
}
pf
=
taosArrayGet
(
fArray
,
index
);
int
tvid
,
tfid
;
TSDB_FILE_T
ttype
;
uint32_t
tversion
;
char
_bname
[
TSDB_FILENAME_LEN
];
if
((
fArraySize
=
taosArrayGetSize
(
fArray
))
<=
0
)
{
taosArrayDestroy
(
fArray
);
tsdbInfo
(
"vgId:%d size of DFileSet from %s is %"
PRIu32
,
REPO_ID
(
pRepo
),
dataDir
,
(
uint32_t
)
fArraySize
);
return
0
;
}
tfsbasename
(
pf
,
_bname
);
tsdbParseDFilename
(
_bname
,
&
tvid
,
&
tfid
,
&
ttype
,
&
tversion
);
// Loop to recover each file set
SDFileSet
fset
=
{
0
};
uint8_t
nDFiles
=
0
;
bool
isOneFSetFinish
=
true
;
int
lastFType
=
-
1
;
// one fileset ends when (1) the array ends or (2) encounter different fid
for
(
size_t
index
=
0
;
index
<
fArraySize
;
++
index
)
{
int
tvid
=
-
1
,
tfid
=
-
1
;
TSDB_FILE_T
ttype
=
TSDB_FILE_MAX
;
uint32_t
tversion
=
-
1
;
char
bname
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
pf
=
taosArrayGet
(
fArray
,
index
);
tfsbasename
(
pf
,
bname
);
tsdbParseDFilename
(
bname
,
&
tvid
,
&
tfid
,
&
ttype
,
&
tversion
);
ASSERT
(
tvid
==
REPO_ID
(
pRepo
));
if
(
tfid
<
pRepo
->
rtn
.
minFid
)
{
// skip file expired
++
index
;
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
&
fset
,
ttype
);
if
(
tfid
<
pRepo
->
rtn
.
minFid
)
{
// skip the file expired
continue
;
}
if
((
isOneFSetFinish
==
false
)
&&
(
lastFType
==
ttype
))
{
// only fetch the 1st file with same fid and type.
continue
;
}
lastFType
=
ttype
;
if
(
ftype
==
0
)
{
if
(
index
==
0
)
{
memset
(
&
fset
,
0
,
sizeof
(
SDFileSet
));
TSDB_FSET_SET_CLOSED
(
&
fset
);
nDFiles
=
1
;
fset
.
fid
=
tfid
;
pDFile
->
f
=
*
pf
;
isOneFSetFinish
=
false
;
}
else
{
if
(
tfid
!=
fset
.
fid
)
{
tsdbError
(
"vgId:%d incomplete dFileSet, fid:%d"
,
REPO_ID
(
pRepo
),
fset
.
fid
);
if
(
fset
.
fid
==
tfid
)
{
++
nDFiles
;
pDFile
->
f
=
*
pf
;
// (1) the array ends
if
(
index
==
fArraySize
-
1
)
{
if
(
tsdbIsDFileSetValid
(
nDFiles
))
{
tsdbInfo
(
"vgId:%d DFileSet %d is fetched, nDFiles=%"
PRIu8
,
REPO_ID
(
pRepo
),
fset
.
fid
,
nDFiles
);
isOneFSetFinish
=
true
;
}
else
{
// return error in case of removing uncomplete DFileSets
terrno
=
TSDB_CODE_TDB_INCOMPLETE_DFILESET
;
tsdbError
(
"vgId:%d incomplete DFileSet, fid:%d, nDFiles=%"
PRIu8
,
REPO_ID
(
pRepo
),
fset
.
fid
,
nDFiles
);
taosArrayDestroy
(
fArray
);
return
-
1
;
}
}
if
(
ttype
!=
ftype
)
{
tsdbError
(
"vgId:%d incomplete dFileSet, fid:%d"
,
REPO_ID
(
pRepo
),
fset
.
fid
);
}
else
{
// (2) encounter different fid
if
(
tsdbIsDFileSetValid
(
nDFiles
))
{
tsdbInfo
(
"vgId:%d DFileSet %d is fetched, nDFiles=%"
PRIu8
,
REPO_ID
(
pRepo
),
fset
.
fid
,
nDFiles
);
isOneFSetFinish
=
true
;
}
else
{
// return error in case of removing uncomplete DFileSets
terrno
=
TSDB_CODE_TDB_INCOMPLETE_DFILESET
;
tsdbError
(
"vgId:%d incomplete DFileSet, fid:%d, nDFiles=%"
PRIu8
,
REPO_ID
(
pRepo
),
fset
.
fid
,
nDFiles
);
taosArrayDestroy
(
fArray
);
return
-
1
;
}
#if 0
// next FSet
memset(&fset, 0, sizeof(SDFileSet));
TSDB_FSET_SET_CLOSED(&fset);
nDFiles = 1;
fset.fid = tfid;
pDFile->f = *pf;
isOneFSetFinish = false;
continue;
#endif
}
}
}
if
(
tsdbOpenDFile
(
pDFile
,
O_RDONLY
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open DFile %s since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pDFile
),
tstrerror
(
terrno
));
if
(
isOneFSetFinish
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
nDFiles
;
++
ftype
)
{
SDFile
*
pDFile1
=
TSDB_DFILE_IN_SET
(
&
fset
,
ftype
);
if
(
tsdbOpenDFile
(
pDFile1
,
O_RDONLY
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open DFile %s since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pDFile1
),
tstrerror
(
terrno
));
taosArrayDestroy
(
fArray
);
return
-
1
;
}
if
(
tsdbLoadDFileHeader
(
pDFile
,
&
(
pDFile
->
info
))
<
0
)
{
tsdbError
(
"vgId:%d failed to load DFile %s header since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pDFile
),
if
(
tsdbLoadDFileHeader
(
pDFile1
,
&
(
pDFile1
->
info
))
<
0
)
{
tsdbError
(
"vgId:%d failed to load DFile %s header since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pDFile1
),
tstrerror
(
terrno
));
taosArrayDestroy
(
fArray
);
return
-
1
;
...
...
@@ -1238,26 +1308,41 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
struct
stat
tfstat
;
// Get real file size
if
(
fstat
(
pDFile
->
fd
,
&
tfstat
)
<
0
)
{
if
(
fstat
(
pDFile1
->
fd
,
&
tfstat
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosArrayDestroy
(
fArray
);
return
-
1
;
}
if
(
pDFile
->
info
.
size
!=
tfstat
.
st_size
)
{
int64_t
tfsize
=
pDFile
->
info
.
size
;
pDFile
->
info
.
size
=
tfstat
.
st_size
;
if
(
pDFile1
->
info
.
size
!=
tfstat
.
st_size
)
{
int64_t
tfsize
=
pDFile1
->
info
.
size
;
pDFile1
->
info
.
size
=
tfstat
.
st_size
;
tsdbInfo
(
"vgId:%d file %s header size is changed from %"
PRId64
" to %"
PRId64
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pDFile
),
tfsize
,
pDFile
->
info
.
size
);
TSDB_FILE_FULL_NAME
(
pDFile1
),
tfsize
,
pDFile1
->
info
.
size
);
}
}
tsdbCloseDFile
(
pDFile
);
index
++
;
tsdbCloseDFile
(
pDFile1
);
}
tsdbInfo
(
"vgId:%d FSET %d is restored"
,
REPO_ID
(
pRepo
),
fset
.
fid
);
// TODO: update the logic when TSDB_FSET_VER definition update.
if
(
nDFiles
==
TSDB_FILE_MIN
)
{
fset
.
ver
=
TSDB_FSET_VER_0
;
}
else
{
fset
.
ver
=
TSDB_LATEST_FSET_VER
;
}
taosArrayPush
(
pfs
->
cstatus
->
df
,
&
fset
);
// next FSet
memset
(
&
fset
,
0
,
sizeof
(
SDFileSet
));
TSDB_FSET_SET_CLOSED
(
&
fset
);
nDFiles
=
1
;
fset
.
fid
=
tfid
;
pDFile
->
f
=
*
pf
;
isOneFSetFinish
=
false
;
}
}
// Resource release
...
...
@@ -1311,10 +1396,16 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
return
-
1
;
}
else
if
(
ftype1
>
ftype2
)
{
return
1
;
}
else
{
if
(
version1
<
version2
)
{
return
-
1
;
}
else
if
(
version1
>
version2
)
{
return
1
;
}
else
{
return
0
;
}
}
}
}
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
,
int32_t
*
nExpired
)
{
...
...
@@ -1335,7 +1426,7 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) {
continue
;
}
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
&
fset
)
;
ftype
++
)
{
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
&
fset
,
ftype
);
if
((
tsdbLoadDFileHeader
(
pDFile
,
&
info
)
<
0
)
||
pDFile
->
info
.
size
!=
info
.
size
||
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
3736e4d7
...
...
@@ -19,6 +19,8 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"head"
,
// TSDB_FILE_HEAD
"data"
,
// TSDB_FILE_DATA
"last"
,
// TSDB_FILE_LAST
"smad"
,
// TSDB_FILE_SMA_DATA(Small Materialized Aggregate for .data File)
"smal"
,
// TSDB_FILE_SMA_LAST(Small Materialized Aggregate for .last File)
""
,
// TSDB_FILE_MAX
"meta"
,
// TSDB_FILE_META
};
...
...
@@ -26,7 +28,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
static
void
tsdbGetFilename
(
int
vid
,
int
fid
,
uint32_t
ver
,
TSDB_FILE_T
ftype
,
char
*
fname
);
static
int
tsdbRollBackMFile
(
SMFile
*
pMFile
);
static
int
tsdbEncodeDFInfo
(
void
**
buf
,
SDFInfo
*
pInfo
);
static
void
*
tsdbDecodeDFInfo
(
void
*
buf
,
SDFInfo
*
pInfo
);
static
void
*
tsdbDecodeDFInfo
(
void
*
buf
,
SDFInfo
*
pInfo
,
TSDB_FVER_TYPE
sfver
);
static
int
tsdbRollBackDFile
(
SDFile
*
pDFile
);
// ============== SMFile
...
...
@@ -198,7 +200,7 @@ int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
tsdbInitMFileEx
(
&
mf
,
pMFile
);
if
(
access
(
TSDB_FILE_FULL_NAME
(
pMFile
),
F_OK
)
!=
0
)
{
tsdbError
(
"vgId:%d meta file %s not exit, report to upper layer to fix it"
,
REPO_ID
(
pRepo
),
tsdbError
(
"vgId:%d meta file %s not exi
s
t, report to upper layer to fix it"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pMFile
));
pRepo
->
state
|=
TSDB_STATE_BAD_META
;
TSDB_FILE_SET_STATE
(
pMFile
,
TSDB_FILE_STATE_BAD
);
...
...
@@ -301,6 +303,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
memset
(
&
(
pDFile
->
info
),
0
,
sizeof
(
pDFile
->
info
));
pDFile
->
info
.
magic
=
TSDB_FILE_INIT_MAGIC
;
pDFile
->
info
.
fver
=
tsdbGetDFSVersion
(
ftype
);
tsdbGetFilename
(
vid
,
fid
,
ver
,
ftype
,
fname
);
tfsInitFile
(
&
(
pDFile
->
f
),
did
.
level
,
did
.
id
,
fname
);
...
...
@@ -320,8 +323,8 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
return
tlen
;
}
void
*
tsdbDecodeSDFile
(
void
*
buf
,
SDFile
*
pDFile
)
{
buf
=
tsdbDecodeDFInfo
(
buf
,
&
(
pDFile
->
info
));
void
*
tsdbDecodeSDFile
(
void
*
buf
,
SDFile
*
pDFile
,
uint32_t
sfver
)
{
buf
=
tsdbDecodeDFInfo
(
buf
,
&
(
pDFile
->
info
)
,
sfver
);
buf
=
tfsDecodeFile
(
buf
,
&
(
pDFile
->
f
));
TSDB_FILE_SET_CLOSED
(
pDFile
);
...
...
@@ -339,8 +342,8 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
static
void
*
tsdbDecodeSDFileEx
(
void
*
buf
,
SDFile
*
pDFile
)
{
char
*
aname
;
buf
=
tsdbDecodeDFInfo
(
buf
,
&
(
pDFile
->
info
));
// The sync module would send DFileSet with latest verion.
buf
=
tsdbDecodeDFInfo
(
buf
,
&
(
pDFile
->
info
)
,
TSDB_LATEST_SFS_VER
);
buf
=
taosDecodeString
(
buf
,
&
aname
);
strncpy
(
TSDB_FILE_FULL_NAME
(
pDFile
),
aname
,
TSDB_FILENAME_LEN
);
TSDB_FILE_SET_CLOSED
(
pDFile
);
...
...
@@ -349,7 +352,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return
buf
;
}
int
tsdbCreateDFile
(
SDFile
*
pDFile
,
bool
updateHeader
)
{
int
tsdbCreateDFile
(
SDFile
*
pDFile
,
bool
updateHeader
,
TSDB_FILE_T
fType
)
{
ASSERT
(
pDFile
->
info
.
size
==
0
&&
pDFile
->
info
.
magic
==
TSDB_FILE_INIT_MAGIC
);
pDFile
->
fd
=
open
(
TSDB_FILE_FULL_NAME
(
pDFile
),
O_WRONLY
|
O_CREAT
|
O_TRUNC
|
O_BINARY
,
0755
);
...
...
@@ -379,6 +382,7 @@ int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
}
pDFile
->
info
.
size
+=
TSDB_FILE_HEAD_SIZE
;
pDFile
->
info
.
fver
=
tsdbGetDFSVersion
(
fType
);
if
(
tsdbUpdateDFileHeader
(
pDFile
)
<
0
)
{
tsdbCloseDFile
(
pDFile
);
...
...
@@ -397,7 +401,6 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) {
}
void
*
ptr
=
buf
;
taosEncodeFixedU32
(
&
ptr
,
TSDB_FS_VERSION
);
tsdbEncodeDFInfo
(
&
ptr
,
&
(
pDFile
->
info
));
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
buf
,
TSDB_FILE_HEAD_SIZE
);
...
...
@@ -410,7 +413,7 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) {
int
tsdbLoadDFileHeader
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
)
{
char
buf
[
TSDB_FILE_HEAD_SIZE
]
=
"
\0
"
;
uint32_t
_version
;
//
uint32_t _version;
ASSERT
(
TSDB_FILE_OPENED
(
pDFile
));
...
...
@@ -428,8 +431,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
}
void
*
pBuf
=
buf
;
pBuf
=
taosDecodeFixedU32
(
pBuf
,
&
_version
);
pBuf
=
tsdbDecodeDFInfo
(
pBuf
,
pInfo
);
pBuf
=
tsdbDecodeDFInfo
(
pBuf
,
pInfo
,
TSDB_LATEST_FVER
);
// only make sure the parameter sfver > 0
return
0
;
}
...
...
@@ -440,7 +442,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
tsdbInitDFileEx
(
&
df
,
pDFile
);
if
(
access
(
TSDB_FILE_FULL_NAME
(
pDFile
),
F_OK
)
!=
0
)
{
tsdbError
(
"vgId:%d data file %s not exit, report to upper layer to fix it"
,
REPO_ID
(
pRepo
),
tsdbError
(
"vgId:%d data file %s not exi
s
t, report to upper layer to fix it"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pDFile
));
pRepo
->
state
|=
TSDB_STATE_BAD_DATA
;
TSDB_FILE_SET_STATE
(
pDFile
,
TSDB_FILE_STATE_BAD
);
...
...
@@ -487,7 +489,7 @@ static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
static
int
tsdbEncodeDFInfo
(
void
**
buf
,
SDFInfo
*
pInfo
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
fver
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
len
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalBlocks
);
...
...
@@ -499,7 +501,12 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
return
tlen
;
}
static
void
*
tsdbDecodeDFInfo
(
void
*
buf
,
SDFInfo
*
pInfo
)
{
static
void
*
tsdbDecodeDFInfo
(
void
*
buf
,
SDFInfo
*
pInfo
,
TSDB_FVER_TYPE
sfver
)
{
if
(
sfver
>
TSDB_FS_VER_0
)
{
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
fver
));
}
else
{
pInfo
->
fver
=
TSDB_FS_VER_0
;
// default value
}
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
len
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalBlocks
));
...
...
@@ -556,19 +563,22 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
}
// ============== Operations on SDFileSet
void
tsdbInitDFileSet
(
SDFileSet
*
pSet
,
SDiskID
did
,
int
vid
,
int
fid
,
uint32_t
ver
)
{
void
tsdbInitDFileSet
(
SDFileSet
*
pSet
,
SDiskID
did
,
int
vid
,
int
fid
,
uint32_t
ver
,
uint16_t
fsetVer
)
{
pSet
->
fid
=
fid
;
pSet
->
state
=
0
;
pSet
->
ver
=
fsetVer
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
)
;
ftype
++
)
{
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
pSet
,
ftype
);
tsdbInitDFile
(
pDFile
,
did
,
vid
,
fid
,
ver
,
ftype
);
}
}
void
tsdbInitDFileSetEx
(
SDFileSet
*
pSet
,
SDFileSet
*
pOSet
)
{
ASSERT_TSDB_FSET_NFILES_VALID
(
pOSet
);
pSet
->
fid
=
pOSet
->
fid
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
pSet
->
ver
=
pOSet
->
ver
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
tsdbInitDFileEx
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
TSDB_DFILE_IN_SET
(
pOSet
,
ftype
));
}
}
...
...
@@ -577,21 +587,28 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pSet
->
fid
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
tlen
+=
taosEncodeFixedU16
(
buf
,
pSet
->
ver
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
tlen
+=
tsdbEncodeSDFile
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
return
tlen
;
}
void
*
tsdbDecodeDFileSet
(
void
*
buf
,
SDFileSet
*
pSet
)
{
void
*
tsdbDecodeDFileSet
(
void
*
buf
,
SDFileSet
*
pSet
,
uint32_t
sfver
)
{
int32_t
fid
;
buf
=
taosDecodeFixedI32
(
buf
,
&
(
fid
));
pSet
->
state
=
0
;
pSet
->
fid
=
fid
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
buf
=
tsdbDecodeSDFile
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
if
(
sfver
>
TSDB_FS_VER_0
)
{
buf
=
taosDecodeFixedU16
(
buf
,
&
(
pSet
->
ver
));
}
ASSERT_TSDB_FSET_NFILES_VALID
(
pSet
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
buf
=
tsdbDecodeSDFile
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
sfver
);
}
return
buf
;
}
...
...
@@ -600,7 +617,8 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pSet
->
fid
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
tlen
+=
taosEncodeFixedU16
(
buf
,
pSet
->
ver
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
tlen
+=
tsdbEncodeSDFileEx
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
...
...
@@ -611,17 +629,20 @@ void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
int32_t
fid
;
buf
=
taosDecodeFixedI32
(
buf
,
&
(
fid
));
buf
=
taosDecodeFixedU16
(
buf
,
&
(
pSet
->
ver
));
pSet
->
fid
=
fid
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
)
;
ftype
++
)
{
buf
=
tsdbDecodeSDFileEx
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
return
buf
;
}
int
tsdbApplyDFileSetChange
(
SDFileSet
*
from
,
SDFileSet
*
to
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
SDFile
*
pDFileFrom
=
(
from
)
?
TSDB_DFILE_IN_SET
(
from
,
ftype
)
:
NULL
;
SDFile
*
pDFileTo
=
(
to
)
?
TSDB_DFILE_IN_SET
(
to
,
ftype
)
:
NULL
;
uint8_t
nFilesFrom
=
from
?
tsdbGetNFiles
(
from
)
:
0
;
uint8_t
nFilesTo
=
to
?
tsdbGetNFiles
(
to
)
:
0
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
MAX
(
nFilesFrom
,
nFilesTo
);
ftype
++
)
{
SDFile
*
pDFileFrom
=
ftype
<
nFilesFrom
?
TSDB_DFILE_IN_SET
(
from
,
ftype
)
:
NULL
;
SDFile
*
pDFileTo
=
ftype
<
nFilesTo
?
TSDB_DFILE_IN_SET
(
to
,
ftype
)
:
NULL
;
if
(
tsdbApplyDFileChange
(
pDFileFrom
,
pDFileTo
)
<
0
)
{
return
-
1
;
}
...
...
@@ -631,8 +652,8 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
}
int
tsdbCreateDFileSet
(
SDFileSet
*
pSet
,
bool
updateHeader
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
if
(
tsdbCreateDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
updateHeader
)
<
0
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
)
;
ftype
++
)
{
if
(
tsdbCreateDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
updateHeader
,
ftype
)
<
0
)
{
tsdbCloseDFileSet
(
pSet
);
tsdbRemoveDFileSet
(
pSet
);
return
-
1
;
...
...
@@ -643,7 +664,7 @@ int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
}
int
tsdbUpdateDFileSetHeader
(
SDFileSet
*
pSet
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
)
;
ftype
++
)
{
if
(
tsdbUpdateDFileHeader
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
))
<
0
)
{
return
-
1
;
}
...
...
@@ -652,7 +673,8 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
}
int
tsdbScanAndTryFixDFileSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
ASSERT_TSDB_FSET_NFILES_VALID
(
pSet
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
if
(
tsdbScanAndTryFixDFile
(
pRepo
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
))
<
0
)
{
return
-
1
;
}
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
3736e4d7
...
...
@@ -680,7 +680,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
tdInitDataRow
(
memRowDataBody
(
row
),
pSchema
);
// first load block index info
if
(
tsdbLoadBlockInfo
(
pReadh
,
NULL
)
<
0
)
{
if
(
tsdbLoadBlockInfo
(
pReadh
,
NULL
,
NULL
)
<
0
)
{
err
=
-
1
;
goto
out
;
}
...
...
@@ -714,10 +714,11 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data
if
(
pBlock
->
numOfSubBlocks
<=
1
)
{
tsdbLoadBlockStatis
(
pReadh
,
pBlock
);
tsdbGetBlockStatis
(
pReadh
,
pBlockStatis
,
(
int
)
numColumns
);
if
(
tsdbLoadBlockStatis
(
pReadh
,
pBlock
)
==
TSDB_STATIS_OK
)
{
tsdbGetBlockStatis
(
pReadh
,
pBlockStatis
,
(
int
)
numColumns
,
pBlock
);
loadStatisData
=
true
;
}
}
for
(
int16_t
i
=
0
;
i
<
numColumns
&&
numColumns
>
pTable
->
restoreColumnNum
;
++
i
)
{
STColumn
*
pCol
=
schemaColAt
(
pSchema
,
i
);
...
...
@@ -782,7 +783,7 @@ out:
static
int
tsdbRestoreLastRow
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SReadH
*
pReadh
,
SBlockIdx
*
pIdx
)
{
ASSERT
(
pTable
->
lastRow
==
NULL
);
if
(
tsdbLoadBlockInfo
(
pReadh
,
NULL
)
<
0
)
{
if
(
tsdbLoadBlockInfo
(
pReadh
,
NULL
,
NULL
)
<
0
)
{
return
-
1
;
}
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
3736e4d7
...
...
@@ -1054,21 +1054,10 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int
return
0
;
// no data blocks in the file belongs to pCheckInfo->pTable
}
if
(
pCheckInfo
->
compSize
<
(
int32_t
)
compIndex
->
len
)
{
assert
(
compIndex
->
len
>
0
);
char
*
t
=
realloc
(
pCheckInfo
->
pCompInfo
,
compIndex
->
len
);
if
(
t
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
code
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
code
;
}
pCheckInfo
->
pCompInfo
=
(
SBlockInfo
*
)
t
;
pCheckInfo
->
compSize
=
compIndex
->
len
;
}
if
(
tsdbLoadBlockInfo
(
&
(
pQueryHandle
->
rhelper
),
(
void
*
)(
pCheckInfo
->
pCompInfo
))
<
0
)
{
if
(
tsdbLoadBlockInfo
(
&
(
pQueryHandle
->
rhelper
),
(
void
**
)(
&
pCheckInfo
->
pCompInfo
),
(
uint32_t
*
)(
&
pCheckInfo
->
compSize
))
<
0
)
{
return
terrno
;
}
SBlockInfo
*
pCompInfo
=
pCheckInfo
->
pCompInfo
;
...
...
@@ -3318,8 +3307,12 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
}
int64_t
stime
=
taosGetTimestampUs
();
if
(
tsdbLoadBlockStatis
(
&
pHandle
->
rhelper
,
pBlockInfo
->
compBlock
)
<
0
)
{
int
statisStatus
=
tsdbLoadBlockStatis
(
&
pHandle
->
rhelper
,
pBlockInfo
->
compBlock
);
if
(
statisStatus
<
TSDB_STATIS_OK
)
{
return
terrno
;
}
else
if
(
statisStatus
>
TSDB_STATIS_OK
)
{
*
pBlockStatis
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
int16_t
*
colIds
=
pHandle
->
defaultLoadColumn
->
pData
;
...
...
@@ -3330,7 +3323,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle
->
statis
[
i
].
colId
=
colIds
[
i
];
}
tsdbGetBlockStatis
(
&
pHandle
->
rhelper
,
pHandle
->
statis
,
(
int
)
numOfCols
);
tsdbGetBlockStatis
(
&
pHandle
->
rhelper
,
pHandle
->
statis
,
(
int
)
numOfCols
,
pBlockInfo
->
compBlock
);
// always load the first primary timestamp column data
SDataStatis
*
pPrimaryColStatis
=
&
pHandle
->
statis
[
0
];
...
...
src/tsdb/src/tsdbReadImpl.c
浏览文件 @
3736e4d7
...
...
@@ -25,6 +25,8 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
int16_t
*
colIds
,
int
numOfColIds
);
static
int
tsdbLoadColData
(
SReadH
*
pReadh
,
SDFile
*
pDFile
,
SBlock
*
pBlock
,
SBlockCol
*
pBlockCol
,
SDataCol
*
pDataCol
);
static
int
tsdbLoadBlockStatisFromDFile
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
static
int
tsdbLoadBlockStatisFromAggr
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbInitReadH
(
SReadH
*
pReadh
,
STsdbRepo
*
pRepo
)
{
ASSERT
(
pReadh
!=
NULL
&&
pRepo
!=
NULL
);
...
...
@@ -61,11 +63,12 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
void
tsdbDestroyReadH
(
SReadH
*
pReadh
)
{
if
(
pReadh
==
NULL
)
return
;
pReadh
->
pExBuf
=
taosTZfree
(
pReadh
->
pExBuf
);
pReadh
->
pCBuf
=
taosTZfree
(
pReadh
->
pCBuf
);
pReadh
->
pBuf
=
taosTZfree
(
pReadh
->
pBuf
);
pReadh
->
pDCols
[
0
]
=
tdFreeDataCols
(
pReadh
->
pDCols
[
0
]);
pReadh
->
pDCols
[
1
]
=
tdFreeDataCols
(
pReadh
->
pDCols
[
1
]);
pReadh
->
pAggrBlkData
=
taosTZfree
(
pReadh
->
pAggrBlkData
);
pReadh
->
pBlkData
=
taosTZfree
(
pReadh
->
pBlkData
);
pReadh
->
pBlkInfo
=
taosTZfree
(
pReadh
->
pBlkInfo
);
pReadh
->
cidx
=
0
;
...
...
@@ -198,6 +201,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
return
0
;
}
#if 0
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
ASSERT(pReadh->pBlkIdx != NULL);
...
...
@@ -241,6 +245,129 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
return 0;
}
#endif
static
FORCE_INLINE
int32_t
tsdbGetSBlockVer
(
int32_t
fver
)
{
switch
(
fver
)
{
case
TSDB_FS_VER_0
:
return
TSDB_SBLK_VER_0
;
case
TSDB_FS_VER_1
:
return
TSDB_SBLK_VER_1
;
default:
return
SBlockVerLatest
;
}
}
static
FORCE_INLINE
size_t
tsdbSizeOfSBlock
(
int32_t
sBlkVer
)
{
switch
(
sBlkVer
)
{
case
TSDB_SBLK_VER_0
:
return
sizeof
(
SBlockV0
);
case
TSDB_SBLK_VER_1
:
return
sizeof
(
SBlockV1
);
default:
return
sizeof
(
SBlock
);
}
}
static
int
tsdbSBlkInfoRefactor
(
SDFile
*
pHeadf
,
SBlockInfo
**
pDstBlkInfo
,
SBlockIdx
*
pBlkIdx
,
uint32_t
*
dstBlkInfoLen
)
{
int
sBlkVer
=
tsdbGetSBlockVer
(
pHeadf
->
info
.
fver
);
if
(
sBlkVer
>
TSDB_SBLK_VER_0
)
{
*
dstBlkInfoLen
=
pBlkIdx
->
len
;
return
TSDB_CODE_SUCCESS
;
}
size_t
originBlkSize
=
tsdbSizeOfSBlock
(
sBlkVer
);
size_t
nBlks
=
(
pBlkIdx
->
len
-
sizeof
(
SBlockInfo
))
/
originBlkSize
;
*
dstBlkInfoLen
=
(
uint32_t
)(
sizeof
(
SBlockInfo
)
+
nBlks
*
sizeof
(
SBlock
));
if
(
pBlkIdx
->
len
==
*
dstBlkInfoLen
)
{
return
TSDB_CODE_SUCCESS
;
}
ASSERT
(
*
dstBlkInfoLen
>=
pBlkIdx
->
len
);
SBlockInfo
*
tmpBlkInfo
=
NULL
;
if
(
tsdbMakeRoom
((
void
**
)(
&
tmpBlkInfo
),
*
dstBlkInfoLen
)
<
0
)
return
-
1
;
memset
(
tmpBlkInfo
,
0
,
*
dstBlkInfoLen
);
// the blkVer is set to 0
memcpy
(
tmpBlkInfo
,
*
pDstBlkInfo
,
sizeof
(
SBlockInfo
));
// copy header
uint32_t
nSubBlks
=
0
;
for
(
int
i
=
0
;
i
<
nBlks
;
++
i
)
{
SBlock
*
tmpBlk
=
tmpBlkInfo
->
blocks
+
i
;
memcpy
(
tmpBlk
,
POINTER_SHIFT
((
*
pDstBlkInfo
)
->
blocks
,
i
*
originBlkSize
),
originBlkSize
);
if
(
i
<
pBlkIdx
->
numOfBlocks
)
{
// super blocks
if
(
tmpBlk
->
numOfSubBlocks
>
1
)
{
// has sub blocks
tmpBlk
->
offset
=
sizeof
(
SBlockInfo
)
+
(
pBlkIdx
->
numOfBlocks
+
nSubBlks
)
*
sizeof
(
SBlock
);
nSubBlks
+=
tmpBlk
->
numOfSubBlocks
;
}
}
// TODO: update the fields if the SBlock definition change later
}
taosTZfree
(
*
pDstBlkInfo
);
*
pDstBlkInfo
=
tmpBlkInfo
;
return
TSDB_CODE_SUCCESS
;
}
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
**
pTarget
,
uint32_t
*
extendedLen
)
{
ASSERT
(
pReadh
->
pBlkIdx
!=
NULL
);
SDFile
*
pHeadf
=
TSDB_READ_HEAD_FILE
(
pReadh
);
SBlockIdx
*
pBlkIdx
=
pReadh
->
pBlkIdx
;
if
(
tsdbSeekDFile
(
pHeadf
,
pBlkIdx
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pHeadf
),
tstrerror
(
terrno
),
pBlkIdx
->
offset
,
pBlkIdx
->
len
);
return
-
1
;
}
if
(
tsdbMakeRoom
((
void
**
)(
&
pReadh
->
pBlkInfo
),
pBlkIdx
->
len
)
<
0
)
return
-
1
;
int64_t
nread
=
tsdbReadDFile
(
pHeadf
,
(
void
*
)(
pReadh
->
pBlkInfo
),
pBlkIdx
->
len
);
if
(
nread
<
0
)
{
tsdbError
(
"vgId:%d failed to load SBlockInfo part while read file %s since %s, offset:%u len :%u"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pHeadf
),
tstrerror
(
terrno
),
pBlkIdx
->
offset
,
pBlkIdx
->
len
);
return
-
1
;
}
if
(
nread
<
pBlkIdx
->
len
)
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes:%"
PRId64
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pHeadf
),
pBlkIdx
->
offset
,
pBlkIdx
->
len
,
nread
);
return
-
1
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
pReadh
->
pBlkInfo
),
pBlkIdx
->
len
))
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pHeadf
),
pBlkIdx
->
offset
,
pBlkIdx
->
len
);
return
-
1
;
}
ASSERT
(
pBlkIdx
->
tid
==
pReadh
->
pBlkInfo
->
tid
&&
pBlkIdx
->
uid
==
pReadh
->
pBlkInfo
->
uid
);
uint32_t
dstBlkInfoLen
=
0
;
if
(
tsdbSBlkInfoRefactor
(
pHeadf
,
&
(
pReadh
->
pBlkInfo
),
pBlkIdx
,
&
dstBlkInfoLen
)
<
0
)
{
return
-
1
;
}
if
(
extendedLen
!=
NULL
)
{
if
(
pTarget
!=
NULL
)
{
if
(
*
extendedLen
<
dstBlkInfoLen
)
{
char
*
t
=
realloc
(
*
pTarget
,
dstBlkInfoLen
);
if
(
t
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
*
pTarget
=
t
;
}
memcpy
(
*
pTarget
,
(
void
*
)(
pReadh
->
pBlkInfo
),
dstBlkInfoLen
);
}
*
extendedLen
=
dstBlkInfoLen
;
}
return
TSDB_CODE_SUCCESS
;
}
int
tsdbLoadBlockData
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
>
0
);
...
...
@@ -296,18 +423,15 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
return
0
;
}
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
<=
1
);
static
int
tsdbLoadBlockStatisFromDFile
(
SReadH
*
pReadh
,
SBlock
*
pBlock
)
{
SDFile
*
pDFile
=
(
pBlock
->
last
)
?
TSDB_READ_LAST_FILE
(
pReadh
)
:
TSDB_READ_DATA_FILE
(
pReadh
);
if
(
tsdbSeekDFile
(
pDFile
,
pBlock
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load block statis part while seek file %s to offset %"
PRId64
" since %s"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
(
int64_t
)
pBlock
->
offset
,
tstrerror
(
terrno
));
return
-
1
;
}
size_t
size
=
TSDB_BLOCK_STATIS_SIZE
(
pBlock
->
numOfCols
);
size_t
size
=
tsdbBlockStatisSize
(
pBlock
->
numOfCols
,
(
uint32_t
)
pBlock
->
blkVer
);
if
(
tsdbMakeRoom
((
void
**
)(
&
(
pReadh
->
pBlkData
)),
size
)
<
0
)
return
-
1
;
int64_t
nread
=
tsdbReadDFile
(
pDFile
,
(
void
*
)(
pReadh
->
pBlkData
),
size
);
...
...
@@ -331,10 +455,66 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
(
int64_t
)
pBlock
->
offset
,
size
);
return
-
1
;
}
return
0
;
}
static
int
tsdbLoadBlockStatisFromAggr
(
SReadH
*
pReadh
,
SBlock
*
pBlock
)
{
ASSERT
((
pBlock
->
blkVer
>
TSDB_SBLK_VER_0
)
&&
(
pBlock
->
aggrStat
));
// TODO: remove after pass all the test
SDFile
*
pDFileAggr
=
pBlock
->
last
?
TSDB_READ_SMAL_FILE
(
pReadh
)
:
TSDB_READ_SMAD_FILE
(
pReadh
);
if
(
tsdbSeekDFile
(
pDFileAggr
,
pBlock
->
aggrOffset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load block aggr part while seek file %s to offset %"
PRIu64
" since %s"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFileAggr
),
(
uint64_t
)
pBlock
->
aggrOffset
,
tstrerror
(
terrno
));
return
-
1
;
}
size_t
sizeAggr
=
tsdbBlockAggrSize
(
pBlock
->
numOfCols
,
(
uint32_t
)
pBlock
->
blkVer
);
if
(
tsdbMakeRoom
((
void
**
)(
&
(
pReadh
->
pAggrBlkData
)),
sizeAggr
)
<
0
)
return
-
1
;
int64_t
nreadAggr
=
tsdbReadDFile
(
pDFileAggr
,
(
void
*
)(
pReadh
->
pAggrBlkData
),
sizeAggr
);
if
(
nreadAggr
<
0
)
{
tsdbError
(
"vgId:%d failed to load block aggr part while read file %s since %s, offset:%"
PRIu64
" len :%"
PRIzu
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFileAggr
),
tstrerror
(
terrno
),
(
uint64_t
)
pBlock
->
aggrOffset
,
sizeAggr
);
return
-
1
;
}
if
(
nreadAggr
<
sizeAggr
)
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d block aggr part in file %s is corrupted, offset:%"
PRIu64
" expected bytes:%"
PRIzu
" read bytes: %"
PRId64
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFileAggr
),
(
uint64_t
)
pBlock
->
aggrOffset
,
sizeAggr
,
nreadAggr
);
return
-
1
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
pReadh
->
pAggrBlkData
),
(
uint32_t
)
sizeAggr
))
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%"
PRIu64
" len :%"
PRIzu
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFileAggr
),
(
uint64_t
)
pBlock
->
aggrOffset
,
sizeAggr
);
return
-
1
;
}
return
0
;
}
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
<=
1
);
if
(
pBlock
->
blkVer
>
TSDB_SBLK_VER_0
)
{
if
(
pBlock
->
aggrStat
)
{
return
tsdbLoadBlockStatisFromAggr
(
pReadh
,
pBlock
);
}
return
TSDB_STATIS_NONE
;
}
return
tsdbLoadBlockStatisFromDFile
(
pReadh
,
pBlock
);
}
int
tsdbLoadBlockOffset
(
SReadH
*
pReadh
,
SBlock
*
pBlock
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
<=
1
);
return
tsdbLoadBlockStatisFromDFile
(
pReadh
,
pBlock
);
}
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
)
{
int
tlen
=
0
;
...
...
@@ -369,7 +549,8 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
return
buf
;
}
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
)
{
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
,
SBlock
*
pBlock
)
{
if
(
pBlock
->
blkVer
==
TSDB_SBLK_VER_0
)
{
SBlockData
*
pBlockData
=
pReadh
->
pBlkData
;
for
(
int
i
=
0
,
j
=
0
;
i
<
numOfCols
;)
{
...
...
@@ -378,23 +559,50 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
i
++
;
continue
;
}
SBlockColV0
*
pSBlkCol
=
((
SBlockColV0
*
)(
pBlockData
->
cols
))
+
j
;
if
(
pStatis
[
i
].
colId
==
pSBlkCol
->
colId
)
{
pStatis
[
i
].
sum
=
pSBlkCol
->
sum
;
pStatis
[
i
].
max
=
pSBlkCol
->
max
;
pStatis
[
i
].
min
=
pSBlkCol
->
min
;
pStatis
[
i
].
maxIndex
=
pSBlkCol
->
maxIndex
;
pStatis
[
i
].
minIndex
=
pSBlkCol
->
minIndex
;
pStatis
[
i
].
numOfNull
=
pSBlkCol
->
numOfNull
;
i
++
;
j
++
;
}
else
if
(
pStatis
[
i
].
colId
<
pSBlkCol
->
colId
)
{
pStatis
[
i
].
numOfNull
=
-
1
;
i
++
;
}
else
{
j
++
;
}
}
}
else
if
(
pBlock
->
aggrStat
)
{
SAggrBlkData
*
pAggrBlkData
=
pReadh
->
pAggrBlkData
;
if
(
pStatis
[
i
].
colId
==
pBlockData
->
cols
[
j
].
colId
)
{
pStatis
[
i
].
sum
=
pBlockData
->
cols
[
j
].
sum
;
pStatis
[
i
].
max
=
pBlockData
->
cols
[
j
].
max
;
pStatis
[
i
].
min
=
pBlockData
->
cols
[
j
].
min
;
pStatis
[
i
].
maxIndex
=
pBlockData
->
cols
[
j
].
maxIndex
;
pStatis
[
i
].
minIndex
=
pBlockData
->
cols
[
j
].
minIndex
;
pStatis
[
i
].
numOfNull
=
pBlockData
->
cols
[
j
].
numOfNull
;
for
(
int
i
=
0
,
j
=
0
;
i
<
numOfCols
;)
{
if
(
j
>=
pAggrBlkData
->
numOfCols
)
{
pStatis
[
i
].
numOfNull
=
-
1
;
i
++
;
continue
;
}
SAggrBlkCol
*
pAggrBlkCol
=
((
SAggrBlkCol
*
)(
pAggrBlkData
->
cols
))
+
j
;
if
(
pStatis
[
i
].
colId
==
pAggrBlkCol
->
colId
)
{
pStatis
[
i
].
sum
=
pAggrBlkCol
->
sum
;
pStatis
[
i
].
max
=
pAggrBlkCol
->
max
;
pStatis
[
i
].
min
=
pAggrBlkCol
->
min
;
pStatis
[
i
].
maxIndex
=
pAggrBlkCol
->
maxIndex
;
pStatis
[
i
].
minIndex
=
pAggrBlkCol
->
minIndex
;
pStatis
[
i
].
numOfNull
=
pAggrBlkCol
->
numOfNull
;
i
++
;
j
++
;
}
else
if
(
pStatis
[
i
].
colId
<
pBlockData
->
cols
[
j
].
colId
)
{
}
else
if
(
pStatis
[
i
].
colId
<
pAggrBlkCol
->
colId
)
{
pStatis
[
i
].
numOfNull
=
-
1
;
i
++
;
}
else
{
j
++
;
}
}
}
}
static
void
tsdbResetReadTable
(
SReadH
*
pReadh
)
{
...
...
@@ -443,7 +651,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return
-
1
;
}
int32_t
tsize
=
TSDB_BLOCK_STATIS_SIZE
(
pBlock
->
numOfCols
);
int32_t
tsize
=
(
int32_t
)
tsdbBlockStatisSize
(
pBlock
->
numOfCols
,
(
uint32_t
)
pBlock
->
blkVer
);
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
TSDB_READ_BUF
(
pReadh
),
tsize
))
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%"
PRId64
" len :%d"
,
...
...
@@ -459,6 +667,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
// Recover the data
int
ccol
=
0
;
// loop iter for SBlockCol object
int
dcol
=
0
;
// loop iter for SDataCols object
SBlockCol
blockCol
=
{
0
};
SBlockCol
*
pBlockCol
=
&
blockCol
;
while
(
dcol
<
pDataCols
->
numOfCols
)
{
SDataCol
*
pDataCol
=
&
(
pDataCols
->
cols
[
dcol
]);
if
(
dcol
!=
0
&&
ccol
>=
pBlockData
->
numOfCols
)
{
...
...
@@ -472,8 +682,9 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
uint32_t
toffset
=
TSDB_KEY_COL_OFFSET
;
int32_t
tlen
=
pBlock
->
keyLen
;
if
(
dcol
!=
0
)
{
SBlockCol
*
pBlockCol
=
&
(
pBlockData
->
cols
[
ccol
]
);
tsdbGetSBlockCol
(
pBlock
,
&
pBlockCol
,
pBlockData
->
cols
,
ccol
);
tcolId
=
pBlockCol
->
colId
;
toffset
=
tsdbGetBlockColOffset
(
pBlockCol
);
tlen
=
pBlockCol
->
len
;
...
...
@@ -555,7 +766,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols
(
pDataCols
);
// If only load timestamp column, no need to load SBlockData part
if
(
numOfColIds
>
1
&&
tsdbLoadBlock
Statis
(
pReadh
,
pBlock
)
<
0
)
return
-
1
;
if
(
numOfColIds
>
1
&&
tsdbLoadBlock
Offset
(
pReadh
,
pBlock
)
<
0
)
return
-
1
;
pDataCols
->
numOfRows
=
pBlock
->
numOfRows
;
...
...
@@ -597,7 +808,9 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
break
;
}
pBlockCol
=
&
(
pReadh
->
pBlkData
->
cols
[
ccol
]);
pBlockCol
=
&
blockCol
;
tsdbGetSBlockCol
(
pBlock
,
&
pBlockCol
,
pReadh
->
pBlkData
->
cols
,
ccol
);
if
(
pBlockCol
->
colId
>
colId
)
{
pBlockCol
=
NULL
;
break
;
...
...
@@ -631,7 +844,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_BUF
(
pReadh
)),
pBlockCol
->
len
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_COMP_BUF
(
pReadh
)),
tsize
)
<
0
)
return
-
1
;
int64_t
offset
=
pBlock
->
offset
+
TSDB_BLOCK_STATIS_SIZE
(
pBlock
->
numOfCols
)
+
tsdbGetBlockColOffset
(
pBlockCol
);
int64_t
offset
=
pBlock
->
offset
+
tsdbBlockStatisSize
(
pBlock
->
numOfCols
,
(
uint32_t
)
pBlock
->
blkVer
)
+
tsdbGetBlockColOffset
(
pBlockCol
);
if
(
tsdbSeekDFile
(
pDFile
,
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load block column data while seek file %s to offset %"
PRId64
" since %s"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
tstrerror
(
terrno
));
...
...
src/tsdb/src/tsdbSync.c
浏览文件 @
3736e4d7
...
...
@@ -466,7 +466,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
return
-
1
;
}
tsdbInitDFileSet
(
&
fset
,
did
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
,
FS_TXN_VERSION
(
pfs
));
tsdbInitDFileSet
(
&
fset
,
did
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
,
FS_TXN_VERSION
(
pfs
)
,
pSynch
->
pdf
->
ver
);
// Create new FSET
if
(
tsdbCreateDFileSet
(
&
fset
,
false
)
<
0
)
{
...
...
@@ -474,7 +474,7 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
return
-
1
;
}
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSynch
->
pdf
)
;
ftype
++
)
{
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
&
fset
,
ftype
);
// local file
SDFile
*
pRDFile
=
TSDB_DFILE_IN_SET
(
pSynch
->
pdf
,
ftype
);
// remote file
...
...
@@ -550,7 +550,10 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
}
static
bool
tsdbIsTowFSetSame
(
SDFileSet
*
pSet1
,
SDFileSet
*
pSet2
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
if
(
pSet1
->
ver
!=
pSet2
->
ver
)
{
return
false
;
}
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet1
);
ftype
++
)
{
SDFile
*
pDFile1
=
TSDB_DFILE_IN_SET
(
pSet1
,
ftype
);
SDFile
*
pDFile2
=
TSDB_DFILE_IN_SET
(
pSet2
,
ftype
);
...
...
@@ -592,7 +595,7 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
if
(
toSend
)
{
tsdbInfo
(
"vgId:%d, fileset:%d will be sent"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
)
;
ftype
++
)
{
SDFile
df
=
*
TSDB_DFILE_IN_SET
(
pSet
,
ftype
);
if
(
tsdbOpenDFile
(
&
df
,
O_RDONLY
)
<
0
)
{
...
...
src/util/src/terror.c
浏览文件 @
3736e4d7
...
...
@@ -278,6 +278,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information t
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_AVAIL_DISK
,
"No available disk"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_MESSED_MSG
,
"TSDB messed message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_IVLD_TAG_VAL
,
"TSDB invalid tag value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_CACHE_LAST_ROW
,
"TSDB no cache last row data"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_INCOMPLETE_DFILESET
,
"Incomplete DFileSet"
)
// query
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_QHANDLE
,
"Invalid handle"
)
...
...
tests/script/unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim
浏览文件 @
3736e4d7
...
...
@@ -170,8 +170,8 @@ if $system_content != 0 then
endi
system_content ls ../../../sim/dnode3/data/vnode/vnode2/tsdb/data/ -l | grep "^-" | wc -l | tr -d '\n'
print ---->dnode3 data files: $system_content expect:
3
if $system_content !=
3
then
print ---->dnode3 data files: $system_content expect:
5
if $system_content !=
5
then
return -1
endi
...
...
@@ -409,26 +409,3 @@ print data00 $data00
if $data00 != $totalRows then
return -1
endi
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录