Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d4c8fed3
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d4c8fed3
编写于
3月 02, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Block-wise SMA extraction
上级
fd8fb4da
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
377 addition
and
100 deletion
+377
-100
include/common/taosdef.h
include/common/taosdef.h
+4
-0
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+5
-5
source/dnode/vnode/src/inc/tsdbFS.h
source/dnode/vnode/src/inc/tsdbFS.h
+0
-2
source/dnode/vnode/src/inc/tsdbFile.h
source/dnode/vnode/src/inc/tsdbFile.h
+42
-5
source/dnode/vnode/src/inc/tsdbReadImpl.h
source/dnode/vnode/src/inc/tsdbReadImpl.h
+100
-27
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+123
-19
source/dnode/vnode/src/tsdb/tsdbCompact.c
source/dnode/vnode/src/tsdb/tsdbCompact.c
+2
-0
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+28
-15
source/dnode/vnode/src/tsdb/tsdbMain.c
source/dnode/vnode/src/tsdb/tsdbMain.c
+4
-3
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+6
-2
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+60
-20
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/taosdef.h
浏览文件 @
d4c8fed3
...
@@ -51,6 +51,10 @@ typedef enum {
...
@@ -51,6 +51,10 @@ typedef enum {
}
ECheckItemType
;
}
ECheckItemType
;
typedef
enum
{
TD_ROW_DISCARD_UPDATE
=
0
,
TD_ROW_OVERWRITE_UPDATE
=
1
,
TD_ROW_PARTIAL_UPDATE
=
2
}
TDUpdateConfig
;
typedef
enum
{
TD_ROW_DISCARD_UPDATE
=
0
,
TD_ROW_OVERWRITE_UPDATE
=
1
,
TD_ROW_PARTIAL_UPDATE
=
2
}
TDUpdateConfig
;
typedef
enum
{
TSDB_STATIS_OK
=
0
,
// statis part exist and load successfully
TSDB_STATIS_NONE
=
1
,
// statis part not exist
}
ETsdbStatisStatus
;
extern
char
*
qtypeStr
[];
extern
char
*
qtypeStr
[];
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
d4c8fed3
...
@@ -27,12 +27,12 @@ extern "C" {
...
@@ -27,12 +27,12 @@ extern "C" {
typedef
struct
SDataStatis
{
typedef
struct
SDataStatis
{
int16_t
colId
;
int16_t
colId
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
int16_t
maxIndex
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
}
SDataStatis
;
}
SDataStatis
;
typedef
struct
STable
{
typedef
struct
STable
{
...
@@ -53,6 +53,8 @@ typedef struct STsdb STsdb;
...
@@ -53,6 +53,8 @@ typedef struct STsdb STsdb;
typedef
struct
STsdbCfg
{
typedef
struct
STsdbCfg
{
int8_t
precision
;
int8_t
precision
;
int8_t
update
;
int8_t
compression
;
uint64_t
lruCacheSize
;
uint64_t
lruCacheSize
;
int32_t
daysPerFile
;
int32_t
daysPerFile
;
int32_t
minRowsPerFileBlock
;
int32_t
minRowsPerFileBlock
;
...
@@ -60,8 +62,6 @@ typedef struct STsdbCfg {
...
@@ -60,8 +62,6 @@ typedef struct STsdbCfg {
int32_t
keep
;
int32_t
keep
;
int32_t
keep1
;
int32_t
keep1
;
int32_t
keep2
;
int32_t
keep2
;
int8_t
update
;
int8_t
compression
;
}
STsdbCfg
;
}
STsdbCfg
;
// query condition to build multi-table data block iterator
// query condition to build multi-table data block iterator
...
...
source/dnode/vnode/src/inc/tsdbFS.h
浏览文件 @
d4c8fed3
...
@@ -18,8 +18,6 @@
...
@@ -18,8 +18,6 @@
#include "tsdbFile.h"
#include "tsdbFile.h"
#define TSDB_FS_VERSION 0
// ================== TSDB global config
// ================== TSDB global config
extern
bool
tsdbForceKeepFile
;
extern
bool
tsdbForceKeepFile
;
...
...
source/dnode/vnode/src/inc/tsdbFile.h
浏览文件 @
d4c8fed3
...
@@ -44,7 +44,37 @@
...
@@ -44,7 +44,37 @@
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
TSDB_FILE_DATA
,
TSDB_FILE_LAST
,
TSDB_FILE_MAX
,
TSDB_FILE_META
}
TSDB_FILE_T
;
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
// .head
TSDB_FILE_DATA
,
// .data
TSDB_FILE_LAST
,
// .last
TSDB_FILE_SMAD
,
// .smad(Block-wise SMA)
TSDB_FILE_SMAL
,
// .smal(Block-wise SMA)
TSDB_FILE_MAX
,
//
TSDB_FILE_TSMA
,
// .tsma.${sma_index_name}, Time-range-wise SMA
TSDB_FILE_RSMA
,
// .rsma.${sma_index_name}, Time-range-wise Rollup SMA
TSDB_FILE_META
// meta
}
TSDB_FILE_T
;
typedef
enum
{
TSDB_FS_VER_0
=
0
,
TSDB_FS_VER_MAX
,
}
ETsdbFsVer
;
#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
static
FORCE_INLINE
uint32_t
tsdbGetDFSVersion
(
TSDB_FILE_T
fType
)
{
// latest version for DFile
switch
(
fType
)
{
case
TSDB_FILE_HEAD
:
// .head
case
TSDB_FILE_DATA
:
// .data
case
TSDB_FILE_LAST
:
// .last
case
TSDB_FILE_SMAD
:
// .smad(Block-wise SMA)
case
TSDB_FILE_SMAL
:
// .smal(Block-wise SMA)
default:
return
TSDB_LATEST_FVER
;
}
}
#if 0
#if 0
// =============== SMFile
// =============== SMFile
...
@@ -169,6 +199,7 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby
...
@@ -169,6 +199,7 @@ static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nby
// =============== SDFile
// =============== SDFile
typedef
struct
{
typedef
struct
{
uint32_t
magic
;
uint32_t
magic
;
uint32_t
fver
;
uint32_t
len
;
uint32_t
len
;
uint32_t
totalBlocks
;
uint32_t
totalBlocks
;
uint32_t
totalSubBlocks
;
uint32_t
totalSubBlocks
;
...
@@ -188,7 +219,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t
...
@@ -188,7 +219,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t
void
tsdbInitDFileEx
(
SDFile
*
pDFile
,
SDFile
*
pODFile
);
void
tsdbInitDFileEx
(
SDFile
*
pDFile
,
SDFile
*
pODFile
);
int
tsdbEncodeSDFile
(
void
**
buf
,
SDFile
*
pDFile
);
int
tsdbEncodeSDFile
(
void
**
buf
,
SDFile
*
pDFile
);
void
*
tsdbDecodeSDFile
(
STsdb
*
pRepo
,
void
*
buf
,
SDFile
*
pDFile
);
void
*
tsdbDecodeSDFile
(
STsdb
*
pRepo
,
void
*
buf
,
SDFile
*
pDFile
);
int
tsdbCreateDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
bool
updateHeader
);
int
tsdbCreateDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
bool
updateHeader
,
TSDB_FILE_T
fType
);
int
tsdbUpdateDFileHeader
(
SDFile
*
pDFile
);
int
tsdbUpdateDFileHeader
(
SDFile
*
pDFile
);
int
tsdbLoadDFileHeader
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
);
int
tsdbLoadDFileHeader
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
);
int
tsdbParseDFilename
(
const
char
*
fname
,
int
*
vid
,
int
*
fid
,
TSDB_FILE_T
*
ftype
,
uint32_t
*
version
);
int
tsdbParseDFilename
(
const
char
*
fname
,
int
*
vid
,
int
*
fid
,
TSDB_FILE_T
*
ftype
,
uint32_t
*
version
);
...
@@ -292,12 +323,18 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
...
@@ -292,12 +323,18 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet
// =============== SDFileSet
typedef
struct
{
typedef
struct
{
int
fid
;
int
fid
;
int
state
;
int8_t
state
;
// -128~127
SDFile
files
[
TSDB_FILE_MAX
];
uint8_t
ver
;
// 0~255, DFileSet version
uint16_t
reserve
;
SDFile
files
[
TSDB_FILE_MAX
];
}
SDFileSet
;
}
SDFileSet
;
#define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_FSET_STATE(s) ((s)->state)
#define TSDB_FSET_VER(s) ((s)->ver)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
...
...
source/dnode/vnode/src/inc/tsdbReadImpl.h
浏览文件 @
d4c8fed3
...
@@ -36,6 +36,7 @@ typedef struct {
...
@@ -36,6 +36,7 @@ typedef struct {
TSKEY
maxKey
;
TSKEY
maxKey
;
}
SBlockIdx
;
}
SBlockIdx
;
#ifdef TD_REFACTOR_3
typedef
struct
{
typedef
struct
{
int64_t
last
:
1
;
int64_t
last
:
1
;
int64_t
offset
:
63
;
int64_t
offset
:
63
;
...
@@ -48,23 +49,35 @@ typedef struct {
...
@@ -48,23 +49,35 @@ typedef struct {
TSKEY
keyFirst
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
TSKEY
keyLast
;
}
SBlock
;
}
SBlock
;
#else
typedef
enum
{
TSDB_SBLK_VER_0
=
0
,
TSDB_SBLK_VER_MAX
,
}
ESBlockVer
;
#define SBlockVerLatest TSDB_SBLK_VER_0
typedef
struct
{
typedef
struct
{
int64_t
last
:
1
;
int64_t
offset
;
int64_t
offset
:
63
;
int32_t
algorithm
:
8
;
int32_t
algorithm
:
8
;
int32_t
numOfRows
:
24
;
int32_t
numOfRows
:
24
;
uint8_t
reserve0
;
uint8_t
last
:
1
;
uint8_t
blkVer
:
7
;
uint8_t
numOfSubBlocks
;
uint8_t
numOfSubBlocks
;
int16_t
numOfCols
;
// not including timestamp column
int16_t
numOfCols
;
// not including timestamp column
uint32_t
len
:
32
;
// data block length
uint32_t
len
;
// data block length
uint32_t
keyLen
:
24
;
// key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint32_t
keyLen
:
24
;
// key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint32_t
reserve
1
:
8
;
uint32_t
reserve
:
8
;
uint64_t
blkVer
:
8
;
uint64_t
aggrStat
:
1
;
uint64_t
aggrOffset
:
56
;
uint64_t
aggrOffset
:
63
;
TSKEY
keyFirst
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
TSKEY
keyLast
;
}
SBlock_3
;
}
SBlockV0
;
#define SBlock SBlockV0 // latest SBlock definition
#endif
typedef
struct
{
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
delimiter
;
// For recovery usage
...
@@ -73,6 +86,7 @@ typedef struct {
...
@@ -73,6 +86,7 @@ typedef struct {
SBlock
blocks
[];
SBlock
blocks
[];
}
SBlockInfo
;
}
SBlockInfo
;
#ifdef TD_REFACTOR_3
typedef
struct
{
typedef
struct
{
int16_t
colId
;
int16_t
colId
;
uint16_t
bitmap
:
1
;
// 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint16_t
bitmap
:
1
;
// 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
...
@@ -89,17 +103,50 @@ typedef struct {
...
@@ -89,17 +103,50 @@ typedef struct {
uint8_t
offsetH
;
uint8_t
offsetH
;
char
padding
[
1
];
char
padding
[
1
];
}
SBlockCol
;
}
SBlockCol
;
#else
typedef
struct
{
int16_t
colId
;
uint8_t
bitmap
:
1
;
// 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint8_t
reserve
:
7
;
uint8_t
type
;
int32_t
len
;
uint32_t
offset
;
}
SBlockColV0
;
#define SBlockCol SBlockColV0 // latest SBlockCol definition
typedef
struct
{
int16_t
colId
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
}
SAggrBlkColV0
;
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
#endif
// Code here just for back-ward compatibility
// Code here just for back-ward compatibility
static
FORCE_INLINE
void
tsdbSetBlockColOffset
(
SBlockCol
*
pBlockCol
,
uint32_t
offset
)
{
static
FORCE_INLINE
void
tsdbSetBlockColOffset
(
SBlockCol
*
pBlockCol
,
uint32_t
offset
)
{
#ifdef TD_REFACTOR_3
pBlockCol
->
offset
=
offset
&
((((
uint32_t
)
1
)
<<
24
)
-
1
);
pBlockCol
->
offset
=
offset
&
((((
uint32_t
)
1
)
<<
24
)
-
1
);
pBlockCol
->
offsetH
=
(
uint8_t
)(
offset
>>
24
);
pBlockCol
->
offsetH
=
(
uint8_t
)(
offset
>>
24
);
#else
pBlockCol
->
offset
=
offset
;
#endif
}
}
static
FORCE_INLINE
uint32_t
tsdbGetBlockColOffset
(
SBlockCol
*
pBlockCol
)
{
static
FORCE_INLINE
uint32_t
tsdbGetBlockColOffset
(
SBlockCol
*
pBlockCol
)
{
#ifdef TD_REFACTOR_3
uint32_t
offset1
=
pBlockCol
->
offset
;
uint32_t
offset1
=
pBlockCol
->
offset
;
uint32_t
offset2
=
pBlockCol
->
offsetH
;
uint32_t
offset2
=
pBlockCol
->
offsetH
;
return
(
offset1
|
(
offset2
<<
24
));
return
(
offset1
|
(
offset2
<<
24
));
#else
return
pBlockCol
->
offset
;
#endif
}
}
typedef
struct
{
typedef
struct
{
...
@@ -109,31 +156,57 @@ typedef struct {
...
@@ -109,31 +156,57 @@ typedef struct {
SBlockCol
cols
[];
SBlockCol
cols
[];
}
SBlockData
;
}
SBlockData
;
typedef
void
SAggrBlkData
;
// SBlockCol cols[];
struct
SReadH
{
struct
SReadH
{
STsdb
*
pRepo
;
STsdb
*
pRepo
;
SDFileSet
rSet
;
// FSET to read
SDFileSet
rSet
;
// FSET to read
SArray
*
aBlkIdx
;
// SBlockIdx array
SArray
*
aBlkIdx
;
// SBlockIdx array
STable
*
pTable
;
// table to read
STable
*
pTable
;
// table to read
SBlockIdx
*
pBlkIdx
;
// current reading table SBlockIdx
SBlockIdx
*
pBlkIdx
;
// current reading table SBlockIdx
int
cidx
;
int
cidx
;
SBlockInfo
*
pBlkInfo
;
SBlockInfo
*
pBlkInfo
;
SBlockData
*
pBlkData
;
// Block info
SBlockData
*
pBlkData
;
// Block info
SDataCols
*
pDCols
[
2
];
SAggrBlkData
*
pAggrBlkData
;
// Aggregate Block info
void
*
pBuf
;
// buffer
SDataCols
*
pDCols
[
2
];
void
*
pCBuf
;
// compression buffer
void
*
pBuf
;
// buffer
void
*
pCBuf
;
// compression buffer
void
*
pExBuf
;
// extra buffer
};
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO(rh)
((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_REPO_ID(rh)
REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_FSET(rh)
(&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_TABLE(rh)
((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static
FORCE_INLINE
size_t
tsdbBlockStatisSize
(
int
nCols
,
uint32_t
blkVer
)
{
switch
(
blkVer
)
{
case
TSDB_SBLK_VER_0
:
default:
return
TSDB_BLOCK_STATIS_SIZE
(
nCols
,
0
);
}
}
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
static
FORCE_INLINE
size_t
tsdbBlockAggrSize
(
int
nCols
,
uint32_t
blkVer
)
{
switch
(
blkVer
)
{
case
TSDB_SBLK_VER_0
:
default:
return
TSDB_BLOCK_AGGR_SIZE
(
nCols
,
0
);
}
}
int
tsdbInitReadH
(
SReadH
*
pReadh
,
STsdb
*
pRepo
);
int
tsdbInitReadH
(
SReadH
*
pReadh
,
STsdb
*
pRepo
);
void
tsdbDestroyReadH
(
SReadH
*
pReadh
);
void
tsdbDestroyReadH
(
SReadH
*
pReadh
);
...
@@ -147,7 +220,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo
...
@@ -147,7 +220,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
);
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
,
SBlock
*
pBlock
);
static
FORCE_INLINE
int
tsdbMakeRoom
(
void
**
ppBuf
,
size_t
size
)
{
static
FORCE_INLINE
int
tsdbMakeRoom
(
void
**
ppBuf
,
size_t
size
)
{
void
*
pBuf
=
*
ppBuf
;
void
*
pBuf
=
*
ppBuf
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
d4c8fed3
...
@@ -50,8 +50,11 @@ typedef struct {
...
@@ -50,8 +50,11 @@ typedef struct {
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
...
@@ -509,7 +512,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
...
@@ -509,7 +512,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD
// TSDB_FILE_HEAD
SDFile
*
pWHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommith
);
SDFile
*
pWHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommith
);
tsdbInitDFile
(
pRepo
,
pWHeadf
,
did
,
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_HEAD
);
tsdbInitDFile
(
pRepo
,
pWHeadf
,
did
,
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_HEAD
);
if
(
tsdbCreateDFile
(
pRepo
,
pWHeadf
,
true
)
<
0
)
{
if
(
tsdbCreateDFile
(
pRepo
,
pWHeadf
,
true
,
TSDB_FILE_HEAD
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWHeadf
),
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWHeadf
),
tstrerror
(
terrno
));
tstrerror
(
terrno
));
...
@@ -560,7 +563,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
...
@@ -560,7 +563,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbInitDFile
(
pRepo
,
pWLastf
,
did
,
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_LAST
);
tsdbInitDFile
(
pRepo
,
pWLastf
,
did
,
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_LAST
);
pCommith
->
isLFileSame
=
false
;
pCommith
->
isLFileSame
=
false
;
if
(
tsdbCreateDFile
(
pRepo
,
pWLastf
,
true
)
<
0
)
{
if
(
tsdbCreateDFile
(
pRepo
,
pWLastf
,
true
,
TSDB_FILE_LAST
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWLastf
),
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWLastf
),
tstrerror
(
terrno
));
tstrerror
(
terrno
));
...
@@ -572,6 +575,74 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
...
@@ -572,6 +575,74 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
}
}
}
}
// TSDB_FILE_SMAD
SDFile
*
pRSmadF
=
TSDB_READ_SMAD_FILE
(
&
(
pCommith
->
readh
));
SDFile
*
pWSmadF
=
TSDB_COMMIT_SMAD_FILE
(
pCommith
);
if
(
access
(
TSDB_FILE_FULL_NAME
(
pRSmadF
),
F_OK
)
!=
0
)
{
tsdbDebug
(
"vgId:%d create data file %s as not exist"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pRSmadF
));
tsdbInitDFile
(
pRepo
,
pWSmadF
,
did
,
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_SMAD
);
if
(
tsdbCreateDFile
(
pRepo
,
pWSmadF
,
true
,
TSDB_FILE_SMAD
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmadF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
(
void
)
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
else
{
tsdbInitDFileEx
(
pWSmadF
,
pRSmadF
);
if
(
tsdbOpenDFile
(
pWSmadF
,
O_RDWR
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmadF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
// TSDB_FILE_SMAL
SDFile
*
pRSmalF
=
TSDB_READ_SMAL_FILE
(
&
(
pCommith
->
readh
));
SDFile
*
pWSmalF
=
TSDB_COMMIT_SMAL_FILE
(
pCommith
);
if
((
pCommith
->
isLFileSame
)
&&
access
(
TSDB_FILE_FULL_NAME
(
pRSmalF
),
F_OK
)
==
0
)
{
tsdbInitDFileEx
(
pWSmalF
,
pRSmalF
);
if
(
tsdbOpenDFile
(
pWSmalF
,
O_RDWR
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmalF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
else
{
tsdbDebug
(
"vgId:%d create data file %s as not exist"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pRSmalF
));
tsdbInitDFile
(
pRepo
,
pWSmalF
,
did
,
fid
,
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_FILE_SMAL
);
if
(
tsdbCreateDFile
(
pRepo
,
pWSmalF
,
true
,
TSDB_FILE_SMAL
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create file %s to commit since %s"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pWSmalF
),
tstrerror
(
terrno
));
tsdbCloseDFileSet
(
pWSet
);
(
void
)
tsdbRemoveDFile
(
pWHeadf
);
if
(
pCommith
->
isRFileSet
)
{
tsdbCloseAndUnsetFSet
(
&
(
pCommith
->
readh
));
return
-
1
;
}
}
}
}
}
return
0
;
return
0
;
...
@@ -1131,41 +1202,57 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
...
@@ -1131,41 +1202,57 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
}
}
int
tsdbWriteBlockImpl
(
STsdb
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
int
tsdbWriteBlockImpl
(
STsdb
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SDFile
*
pDFileAggr
,
SDataCols
*
pDataCols
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppCBuf
)
{
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppCBuf
,
void
**
ppExBuf
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SBlockData
*
pBlockData
;
SBlockData
*
pBlockData
=
NULL
;
int64_t
offset
=
0
;
SAggrBlkData
*
pAggrBlkData
=
NULL
;
int
rowsToWrite
=
pDataCols
->
numOfRows
;
int64_t
offset
=
0
,
offsetAggr
=
0
;
int
rowsToWrite
=
pDataCols
->
numOfRows
;
ASSERT
(
rowsToWrite
>
0
&&
rowsToWrite
<=
pCfg
->
maxRowsPerFileBlock
);
ASSERT
(
rowsToWrite
>
0
&&
rowsToWrite
<=
pCfg
->
maxRowsPerFileBlock
);
ASSERT
((
!
isLast
)
||
rowsToWrite
<
pCfg
->
minRowsPerFileBlock
);
ASSERT
((
!
isLast
)
||
rowsToWrite
<
pCfg
->
minRowsPerFileBlock
);
// Make buffer space
// Make buffer space
if
(
tsdbMakeRoom
(
ppBuf
,
TSDB_BLOCK_STATIS_SIZE
(
pDataCols
->
numOfCols
))
<
0
)
{
if
(
tsdbMakeRoom
(
ppBuf
,
tsdbBlockStatisSize
(
pDataCols
->
numOfCols
,
SBlockVerLatest
))
<
0
)
{
return
-
1
;
return
-
1
;
}
}
pBlockData
=
(
SBlockData
*
)(
*
ppBuf
);
pBlockData
=
(
SBlockData
*
)(
*
ppBuf
);
if
(
tsdbMakeRoom
(
ppExBuf
,
tsdbBlockAggrSize
(
pDataCols
->
numOfCols
,
SBlockVerLatest
))
<
0
)
{
return
-
1
;
}
pAggrBlkData
=
(
SAggrBlkData
*
)(
*
ppExBuf
);
// Get # of cols not all NULL(not including key column)
// Get # of cols not all NULL(not including key column)
int
nColsNotAllNull
=
0
;
int
nColsNotAllNull
=
0
;
for
(
int
ncol
=
1
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
// ncol from 1, we skip the timestamp column
for
(
int
ncol
=
1
;
ncol
<
pDataCols
->
numOfCols
;
++
ncol
)
{
// ncol from 1, we skip the timestamp column
SDataCol
*
pDataCol
=
pDataCols
->
cols
+
ncol
;
SDataCol
*
pDataCol
=
pDataCols
->
cols
+
ncol
;
SBlockCol
*
pBlockCol
=
pBlockData
->
cols
+
nColsNotAllNull
;
SBlockCol
*
pBlockCol
=
pBlockData
->
cols
+
nColsNotAllNull
;
SAggrBlkCol
*
pAggrBlkCol
=
(
SAggrBlkCol
*
)
pAggrBlkData
+
nColsNotAllNull
;
if
(
isAllRowsNull
(
pDataCol
))
{
// all data to commit are NULL, just ignore it
if
(
isAllRowsNull
(
pDataCol
))
{
// all data to commit are NULL, just ignore it
continue
;
continue
;
}
}
memset
(
pBlockCol
,
0
,
sizeof
(
*
pBlockCol
));
memset
(
pBlockCol
,
0
,
sizeof
(
*
pBlockCol
));
memset
(
pAggrBlkCol
,
0
,
sizeof
(
*
pAggrBlkCol
));
pBlockCol
->
colId
=
pDataCol
->
colId
;
pBlockCol
->
colId
=
pDataCol
->
colId
;
pBlockCol
->
type
=
pDataCol
->
type
;
pBlockCol
->
type
=
pDataCol
->
type
;
pAggrBlkCol
->
colId
=
pDataCol
->
colId
;
if
(
tDataTypes
[
pDataCol
->
type
].
statisFunc
)
{
if
(
tDataTypes
[
pDataCol
->
type
].
statisFunc
)
{
#if 0
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
&(pBlockCol->numOfNull));
if
(
pBlockCol
->
numOfNull
==
0
)
{
#endif
(
*
tDataTypes
[
pDataCol
->
type
].
statisFunc
)(
pDataCol
->
pData
,
rowsToWrite
,
&
(
pAggrBlkCol
->
min
),
&
(
pAggrBlkCol
->
max
),
&
(
pAggrBlkCol
->
sum
),
&
(
pAggrBlkCol
->
minIndex
),
&
(
pAggrBlkCol
->
maxIndex
),
&
(
pAggrBlkCol
->
numOfNull
));
if
(
pAggrBlkCol
->
numOfNull
==
0
)
{
TD_SET_COL_ROWS_NORM
(
pBlockCol
);
TD_SET_COL_ROWS_NORM
(
pBlockCol
);
}
else
{
}
else
{
TD_SET_COL_ROWS_MISC
(
pBlockCol
);
TD_SET_COL_ROWS_MISC
(
pBlockCol
);
...
@@ -1181,13 +1268,14 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
...
@@ -1181,13 +1268,14 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Compress the data if neccessary
// Compress the data if neccessary
int
tcol
=
0
;
// counter of not all NULL and written columns
int
tcol
=
0
;
// counter of not all NULL and written columns
uint32_t
toffset
=
0
;
uint32_t
toffset
=
0
;
int32_t
tsize
=
TSDB_BLOCK_STATIS_SIZE
(
nColsNotAllNull
);
int32_t
tsize
=
(
int32_t
)
tsdbBlockStatisSize
(
nColsNotAllNull
,
SBlockVerLatest
);
int32_t
lsize
=
tsize
;
int32_t
lsize
=
tsize
;
uint32_t
tsizeAggr
=
(
uint32_t
)
tsdbBlockAggrSize
(
nColsNotAllNull
,
SBlockVerLatest
);
int32_t
keyLen
=
0
;
int32_t
keyLen
=
0
;
int32_t
nBitmaps
=
(
int32_t
)
TD_BITMAP_BYTES
(
rowsToWrite
);
int32_t
nBitmaps
=
(
int32_t
)
TD_BITMAP_BYTES
(
rowsToWrite
);
int32_t
tBitmaps
=
0
;
int32_t
tBitmaps
=
0
;
for
(
int
ncol
=
0
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
for
(
int
ncol
=
0
;
ncol
<
pDataCols
->
numOfCols
;
++
ncol
)
{
// All not NULL columns finish
// All not NULL columns finish
if
(
ncol
!=
0
&&
tcol
>=
nColsNotAllNull
)
break
;
if
(
ncol
!=
0
&&
tcol
>=
nColsNotAllNull
)
break
;
...
@@ -1248,7 +1336,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
...
@@ -1248,7 +1336,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
if
(
ncol
!=
0
)
{
if
(
ncol
!=
0
)
{
tsdbSetBlockColOffset
(
pBlockCol
,
toffset
);
tsdbSetBlockColOffset
(
pBlockCol
,
toffset
);
pBlockCol
->
len
=
flen
;
pBlockCol
->
len
=
flen
;
tcol
++
;
++
tcol
;
}
else
{
}
else
{
keyLen
=
flen
;
keyLen
=
flen
;
}
}
...
@@ -1269,6 +1357,18 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
...
@@ -1269,6 +1357,18 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
return
-
1
;
return
-
1
;
}
}
uint32_t
aggrStatus
=
nColsNotAllNull
>
0
?
1
:
0
;
if
(
aggrStatus
>
0
)
{
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pAggrBlkData
,
tsizeAggr
);
tsdbUpdateDFileMagic
(
pDFileAggr
,
POINTER_SHIFT
(
pAggrBlkData
,
tsizeAggr
-
sizeof
(
TSCKSUM
)));
// Write the whole block to file
if
(
tsdbAppendDFile
(
pDFileAggr
,
(
void
*
)
pAggrBlkData
,
tsizeAggr
,
&
offsetAggr
)
<
tsizeAggr
)
{
return
-
1
;
}
}
// Update pBlock membership vairables
// Update pBlock membership vairables
pBlock
->
last
=
isLast
;
pBlock
->
last
=
isLast
;
pBlock
->
offset
=
offset
;
pBlock
->
offset
=
offset
;
...
@@ -1280,6 +1380,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
...
@@ -1280,6 +1380,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock
->
numOfCols
=
nColsNotAllNull
;
pBlock
->
numOfCols
=
nColsNotAllNull
;
pBlock
->
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
pBlock
->
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
pBlock
->
keyLast
=
dataColsKeyLast
(
pDataCols
);
pBlock
->
keyLast
=
dataColsKeyLast
(
pDataCols
);
pBlock
->
aggrStat
=
aggrStatus
;
pBlock
->
blkVer
=
SBlockVerLatest
;
pBlock
->
aggrOffset
=
(
uint64_t
)
offsetAggr
;
tsdbDebug
(
"vgId:%d uid:%"
PRId64
" a block of data is written to file %s, offset %"
PRId64
tsdbDebug
(
"vgId:%d uid:%"
PRId64
" a block of data is written to file %s, offset %"
PRId64
" numOfRows %d len %d numOfCols %"
PRId16
" keyFirst %"
PRId64
" keyLast %"
PRId64
,
" numOfRows %d len %d numOfCols %"
PRId16
" keyFirst %"
PRId64
" keyLast %"
PRId64
,
...
@@ -1291,9 +1394,10 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
...
@@ -1291,9 +1394,10 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
static
int
tsdbWriteBlock
(
SCommitH
*
pCommith
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
static
int
tsdbWriteBlock
(
SCommitH
*
pCommith
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
)
{
bool
isSuper
)
{
return
tsdbWriteBlockImpl
(
TSDB_COMMIT_REPO
(
pCommith
),
TSDB_COMMIT_TABLE
(
pCommith
),
pDFile
,
pDataCols
,
pBlock
,
isLast
,
return
tsdbWriteBlockImpl
(
TSDB_COMMIT_REPO
(
pCommith
),
TSDB_COMMIT_TABLE
(
pCommith
),
pDFile
,
isSuper
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))),
isLast
?
TSDB_COMMIT_SMAL_FILE
(
pCommith
)
:
TSDB_COMMIT_SMAD_FILE
(
pCommith
),
pDataCols
,
(
void
**
)(
&
(
TSDB_COMMIT_COMP_BUF
(
pCommith
))));
pBlock
,
isLast
,
isSuper
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))),
(
void
**
)(
&
(
TSDB_COMMIT_COMP_BUF
(
pCommith
))),
(
void
**
)(
&
(
TSDB_COMMIT_EXBUF
(
pCommith
))));
}
}
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
)
{
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbCompact.c
浏览文件 @
d4c8fed3
...
@@ -38,6 +38,8 @@ typedef struct {
...
@@ -38,6 +38,8 @@ typedef struct {
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
d4c8fed3
...
@@ -422,7 +422,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
...
@@ -422,7 +422,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
return
-
1
;
return
-
1
;
}
}
fsheader
.
version
=
TSDB_
FS_VERSION
;
fsheader
.
version
=
TSDB_
LATEST_SFS_VER
;
if
(
taosArrayGetSize
(
pStatus
->
df
)
==
0
)
{
if
(
taosArrayGetSize
(
pStatus
->
df
)
==
0
)
{
fsheader
.
len
=
0
;
fsheader
.
len
=
0
;
}
else
{
}
else
{
...
@@ -697,7 +697,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
...
@@ -697,7 +697,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
ptr
=
tsdbDecodeFSHeader
(
ptr
,
&
fsheader
);
ptr
=
tsdbDecodeFSHeader
(
ptr
,
&
fsheader
);
ptr
=
tsdbDecodeFSMeta
(
ptr
,
&
(
pStatus
->
meta
));
ptr
=
tsdbDecodeFSMeta
(
ptr
,
&
(
pStatus
->
meta
));
if
(
fsheader
.
version
!=
TSDB_
FS_VERSION
)
{
if
(
fsheader
.
version
!=
TSDB_
LATEST_SFS_VER
)
{
// TODO: handle file version change
// TODO: handle file version change
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
d4c8fed3
...
@@ -19,6 +19,8 @@ static const char *TSDB_FNAME_SUFFIX[] = {
...
@@ -19,6 +19,8 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"head"
,
// TSDB_FILE_HEAD
"head"
,
// TSDB_FILE_HEAD
"data"
,
// TSDB_FILE_DATA
"data"
,
// TSDB_FILE_DATA
"last"
,
// TSDB_FILE_LAST
"last"
,
// TSDB_FILE_LAST
"smad"
,
// TSDB_FILE_SMAD
"smal"
,
// TSDB_FILE_SMAL
""
,
// TSDB_FILE_MAX
""
,
// TSDB_FILE_MAX
"meta"
,
// TSDB_FILE_META
"meta"
,
// TSDB_FILE_META
};
};
...
@@ -304,6 +306,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
...
@@ -304,6 +306,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
memset
(
&
(
pDFile
->
info
),
0
,
sizeof
(
pDFile
->
info
));
memset
(
&
(
pDFile
->
info
),
0
,
sizeof
(
pDFile
->
info
));
pDFile
->
info
.
magic
=
TSDB_FILE_INIT_MAGIC
;
pDFile
->
info
.
magic
=
TSDB_FILE_INIT_MAGIC
;
pDFile
->
info
.
fver
=
tsdbGetDFSVersion
(
ftype
);
tsdbGetFilename
(
pRepo
->
vgId
,
fid
,
ver
,
ftype
,
fname
);
tsdbGetFilename
(
pRepo
->
vgId
,
fid
,
ver
,
ftype
,
fname
);
tfsInitFile
(
pRepo
->
pTfs
,
&
(
pDFile
->
f
),
did
,
fname
);
tfsInitFile
(
pRepo
->
pTfs
,
&
(
pDFile
->
f
),
did
,
fname
);
...
@@ -341,7 +344,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
...
@@ -341,7 +344,7 @@ static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
}
}
static
void
*
tsdbDecodeSDFileEx
(
void
*
buf
,
SDFile
*
pDFile
)
{
static
void
*
tsdbDecodeSDFileEx
(
void
*
buf
,
SDFile
*
pDFile
)
{
char
*
aname
;
char
*
aname
=
NULL
;
buf
=
tsdbDecodeDFInfo
(
buf
,
&
(
pDFile
->
info
));
buf
=
tsdbDecodeDFInfo
(
buf
,
&
(
pDFile
->
info
));
buf
=
taosDecodeString
(
buf
,
&
aname
);
buf
=
taosDecodeString
(
buf
,
&
aname
);
...
@@ -352,7 +355,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
...
@@ -352,7 +355,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return
buf
;
return
buf
;
}
}
int
tsdbCreateDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
bool
updateHeader
)
{
int
tsdbCreateDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
bool
updateHeader
,
TSDB_FILE_T
fType
)
{
ASSERT
(
pDFile
->
info
.
size
==
0
&&
pDFile
->
info
.
magic
==
TSDB_FILE_INIT_MAGIC
);
ASSERT
(
pDFile
->
info
.
size
==
0
&&
pDFile
->
info
.
magic
==
TSDB_FILE_INIT_MAGIC
);
pDFile
->
pFile
=
taosOpenFile
(
TSDB_FILE_FULL_NAME
(
pDFile
),
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
pDFile
->
pFile
=
taosOpenFile
(
TSDB_FILE_FULL_NAME
(
pDFile
),
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
...
@@ -382,6 +385,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
...
@@ -382,6 +385,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
}
}
pDFile
->
info
.
size
+=
TSDB_FILE_HEAD_SIZE
;
pDFile
->
info
.
size
+=
TSDB_FILE_HEAD_SIZE
;
pDFile
->
info
.
fver
=
tsdbGetDFSVersion
(
fType
);
if
(
tsdbUpdateDFileHeader
(
pDFile
)
<
0
)
{
if
(
tsdbUpdateDFileHeader
(
pDFile
)
<
0
)
{
tsdbCloseDFile
(
pDFile
);
tsdbCloseDFile
(
pDFile
);
...
@@ -493,6 +497,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
...
@@ -493,6 +497,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int
tlen
=
0
;
int
tlen
=
0
;
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
fver
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
len
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
len
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalSubBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalSubBlocks
);
...
@@ -505,6 +510,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
...
@@ -505,6 +510,7 @@ static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
static
void
*
tsdbDecodeDFInfo
(
void
*
buf
,
SDFInfo
*
pInfo
)
{
static
void
*
tsdbDecodeDFInfo
(
void
*
buf
,
SDFInfo
*
pInfo
)
{
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
fver
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
len
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
len
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalSubBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalSubBlocks
));
...
@@ -562,8 +568,10 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
...
@@ -562,8 +568,10 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
// ============== Operations on SDFileSet
// ============== Operations on SDFileSet
void
tsdbInitDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SDiskID
did
,
int
fid
,
uint32_t
ver
)
{
void
tsdbInitDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SDiskID
did
,
int
fid
,
uint32_t
ver
)
{
pSet
->
fid
=
fid
;
TSDB_FSET_FID
(
pSet
)
=
fid
;
pSet
->
state
=
0
;
TSDB_FSET_VER
(
pSet
)
=
TSDB_LATEST_FSET_VER
;
TSDB_FSET_STATE
(
pSet
)
=
0
;
pSet
->
reserve
=
0
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
pSet
,
ftype
);
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
pSet
,
ftype
);
...
@@ -572,7 +580,7 @@ void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint3
...
@@ -572,7 +580,7 @@ void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint3
}
}
void
tsdbInitDFileSetEx
(
SDFileSet
*
pSet
,
SDFileSet
*
pOSet
)
{
void
tsdbInitDFileSetEx
(
SDFileSet
*
pSet
,
SDFileSet
*
pOSet
)
{
pSet
->
fid
=
pOSet
->
fid
;
TSDB_FSET_FID
(
pSet
)
=
TSDB_FSET_FID
(
pOSet
)
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
tsdbInitDFileEx
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
TSDB_DFILE_IN_SET
(
pOSet
,
ftype
));
tsdbInitDFileEx
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
TSDB_DFILE_IN_SET
(
pOSet
,
ftype
));
}
}
...
@@ -581,7 +589,10 @@ void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
...
@@ -581,7 +589,10 @@ void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
int
tsdbEncodeDFileSet
(
void
**
buf
,
SDFileSet
*
pSet
)
{
int
tsdbEncodeDFileSet
(
void
**
buf
,
SDFileSet
*
pSet
)
{
int
tlen
=
0
;
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pSet
->
fid
);
tlen
+=
taosEncodeFixedI32
(
buf
,
TSDB_FSET_FID
(
pSet
));
// state not included
tlen
+=
taosEncodeFixedU8
(
buf
,
TSDB_FSET_VER
(
pSet
));
tlen
+=
taosEncodeFixedU16
(
buf
,
pSet
->
reserve
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
tlen
+=
tsdbEncodeSDFile
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
tlen
+=
tsdbEncodeSDFile
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
...
@@ -590,11 +601,11 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
...
@@ -590,11 +601,11 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
}
}
void
*
tsdbDecodeDFileSet
(
STsdb
*
pRepo
,
void
*
buf
,
SDFileSet
*
pSet
)
{
void
*
tsdbDecodeDFileSet
(
STsdb
*
pRepo
,
void
*
buf
,
SDFileSet
*
pSet
)
{
int32_t
fid
;
buf
=
taosDecodeFixedI32
(
buf
,
&
(
TSDB_FSET_FID
(
pSet
)));
TSDB_FSET_STATE
(
pSet
)
=
0
;
buf
=
taosDecodeFixedU8
(
buf
,
&
(
TSDB_FSET_VER
(
pSet
)));
buf
=
taosDecodeFixedU16
(
buf
,
&
(
pSet
->
reserve
));
buf
=
taosDecodeFixedI32
(
buf
,
&
(
fid
));
pSet
->
state
=
0
;
pSet
->
fid
=
fid
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
buf
=
tsdbDecodeSDFile
(
pRepo
,
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
buf
=
tsdbDecodeSDFile
(
pRepo
,
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
...
@@ -604,7 +615,9 @@ void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
...
@@ -604,7 +615,9 @@ void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int
tsdbEncodeDFileSetEx
(
void
**
buf
,
SDFileSet
*
pSet
)
{
int
tsdbEncodeDFileSetEx
(
void
**
buf
,
SDFileSet
*
pSet
)
{
int
tlen
=
0
;
int
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pSet
->
fid
);
tlen
+=
taosEncodeFixedI32
(
buf
,
TSDB_FSET_FID
(
pSet
));
tlen
+=
taosEncodeFixedU8
(
buf
,
TSDB_FSET_VER
(
pSet
));
tlen
+=
taosEncodeFixedU16
(
buf
,
pSet
->
reserve
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
tlen
+=
tsdbEncodeSDFileEx
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
tlen
+=
tsdbEncodeSDFileEx
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
...
@@ -613,10 +626,10 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
...
@@ -613,10 +626,10 @@ int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
}
}
void
*
tsdbDecodeDFileSetEx
(
void
*
buf
,
SDFileSet
*
pSet
)
{
void
*
tsdbDecodeDFileSetEx
(
void
*
buf
,
SDFileSet
*
pSet
)
{
int32_t
fid
;
buf
=
taosDecodeFixedI32
(
buf
,
&
(
TSDB_FSET_FID
(
pSet
)));
buf
=
taosDecodeFixedU8
(
buf
,
&
(
TSDB_FSET_VER
(
pSet
)));
buf
=
taosDecodeFixedU16
(
buf
,
&
(
pSet
->
reserve
));
buf
=
taosDecodeFixedI32
(
buf
,
&
(
fid
));
pSet
->
fid
=
fid
;
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
buf
=
tsdbDecodeSDFileEx
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
buf
=
tsdbDecodeSDFileEx
(
buf
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
...
@@ -637,7 +650,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
...
@@ -637,7 +650,7 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
int
tsdbCreateDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
bool
updateHeader
)
{
int
tsdbCreateDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
bool
updateHeader
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
if
(
tsdbCreateDFile
(
pRepo
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
updateHeader
)
<
0
)
{
if
(
tsdbCreateDFile
(
pRepo
,
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
updateHeader
,
ftype
)
<
0
)
{
tsdbCloseDFileSet
(
pSet
);
tsdbCloseDFileSet
(
pSet
);
tsdbRemoveDFileSet
(
pSet
);
tsdbRemoveDFileSet
(
pSet
);
return
-
1
;
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbMain.c
浏览文件 @
d4c8fed3
...
@@ -821,9 +821,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
...
@@ -821,9 +821,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data
// file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) {
if (pBlock->numOfSubBlocks <= 1) {
tsdbLoadBlockStatis(pReadh, pBlock);
if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) {
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true;
loadStatisData = true;
}
}
}
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
d4c8fed3
...
@@ -3276,8 +3276,12 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
...
@@ -3276,8 +3276,12 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
}
}
int64_t
stime
=
taosGetTimestampUs
();
int64_t
stime
=
taosGetTimestampUs
();
if
(
tsdbLoadBlockStatis
(
&
pHandle
->
rhelper
,
pBlockInfo
->
compBlock
)
<
0
)
{
int
statisStatus
=
tsdbLoadBlockStatis
(
&
pHandle
->
rhelper
,
pBlockInfo
->
compBlock
);
if
(
statisStatus
<
TSDB_STATIS_OK
)
{
return
terrno
;
return
terrno
;
}
else
if
(
statisStatus
>
TSDB_STATIS_OK
)
{
*
pBlockStatis
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
}
int16_t
*
colIds
=
pHandle
->
defaultLoadColumn
->
pData
;
int16_t
*
colIds
=
pHandle
->
defaultLoadColumn
->
pData
;
...
@@ -3288,7 +3292,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
...
@@ -3288,7 +3292,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
pHandle
->
statis
[
i
].
colId
=
colIds
[
i
];
pHandle
->
statis
[
i
].
colId
=
colIds
[
i
];
}
}
tsdbGetBlockStatis
(
&
pHandle
->
rhelper
,
pHandle
->
statis
,
(
int
)
numOfCols
);
tsdbGetBlockStatis
(
&
pHandle
->
rhelper
,
pHandle
->
statis
,
(
int
)
numOfCols
,
pBlockInfo
->
compBlock
);
// always load the first primary timestamp column data
// always load the first primary timestamp column data
SDataStatis
*
pPrimaryColStatis
=
&
pHandle
->
statis
[
0
];
SDataStatis
*
pPrimaryColStatis
=
&
pHandle
->
statis
[
0
];
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
d4c8fed3
...
@@ -63,10 +63,12 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
...
@@ -63,10 +63,12 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
void
tsdbDestroyReadH
(
SReadH
*
pReadh
)
{
void
tsdbDestroyReadH
(
SReadH
*
pReadh
)
{
if
(
pReadh
==
NULL
)
return
;
if
(
pReadh
==
NULL
)
return
;
pReadh
->
pExBuf
=
taosTZfree
(
pReadh
->
pExBuf
);
pReadh
->
pCBuf
=
taosTZfree
(
pReadh
->
pCBuf
);
pReadh
->
pCBuf
=
taosTZfree
(
pReadh
->
pCBuf
);
pReadh
->
pBuf
=
taosTZfree
(
pReadh
->
pBuf
);
pReadh
->
pBuf
=
taosTZfree
(
pReadh
->
pBuf
);
pReadh
->
pDCols
[
0
]
=
tdFreeDataCols
(
pReadh
->
pDCols
[
0
]);
pReadh
->
pDCols
[
0
]
=
tdFreeDataCols
(
pReadh
->
pDCols
[
0
]);
pReadh
->
pDCols
[
1
]
=
tdFreeDataCols
(
pReadh
->
pDCols
[
1
]);
pReadh
->
pDCols
[
1
]
=
tdFreeDataCols
(
pReadh
->
pDCols
[
1
]);
pReadh
->
pAggrBlkData
=
taosTZfree
(
pReadh
->
pAggrBlkData
);
pReadh
->
pBlkData
=
taosTZfree
(
pReadh
->
pBlkData
);
pReadh
->
pBlkData
=
taosTZfree
(
pReadh
->
pBlkData
);
pReadh
->
pBlkInfo
=
taosTZfree
(
pReadh
->
pBlkInfo
);
pReadh
->
pBlkInfo
=
taosTZfree
(
pReadh
->
pBlkInfo
);
pReadh
->
cidx
=
0
;
pReadh
->
cidx
=
0
;
...
@@ -305,39 +307,45 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
...
@@ -305,39 +307,45 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
)
{
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
<=
1
);
ASSERT
(
pBlock
->
numOfSubBlocks
<=
1
);
SDFile
*
pDFile
=
(
pBlock
->
last
)
?
TSDB_READ_LAST_FILE
(
pReadh
)
:
TSDB_READ_DATA_FILE
(
pReadh
);
if
(
!
pBlock
->
aggrStat
)
{
return
TSDB_STATIS_NONE
;
}
if
(
tsdbSeekDFile
(
pDFile
,
pBlock
->
offset
,
SEEK_SET
)
<
0
)
{
SDFile
*
pDFileAggr
=
pBlock
->
last
?
TSDB_READ_SMAL_FILE
(
pReadh
)
:
TSDB_READ_SMAD_FILE
(
pReadh
);
tsdbError
(
"vgId:%d failed to load block statis part while seek file %s to offset %"
PRId64
" since %s"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
(
int64_t
)
pBlock
->
offset
,
tstrerror
(
terrno
));
if
(
tsdbSeekDFile
(
pDFileAggr
,
pBlock
->
aggrOffset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load block aggr part while seek file %s to offset %"
PRIu64
" since %s"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFileAggr
),
(
uint64_t
)
pBlock
->
aggrOffset
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
size_t
size
=
TSDB_BLOCK_STATIS_SIZE
(
pBlock
->
numOfCols
);
size_t
size
Aggr
=
tsdbBlockAggrSize
(
pBlock
->
numOfCols
,
(
uint32_t
)
pBlock
->
blkVer
);
if
(
tsdbMakeRoom
((
void
**
)(
&
(
pReadh
->
p
BlkData
)),
size
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
((
void
**
)(
&
(
pReadh
->
p
AggrBlkData
)),
sizeAggr
)
<
0
)
return
-
1
;
int64_t
nread
=
tsdbReadDFile
(
pDFile
,
(
void
*
)(
pReadh
->
pBlkData
),
size
);
int64_t
nreadAggr
=
tsdbReadDFile
(
pDFileAggr
,
(
void
*
)(
pReadh
->
pAggrBlkData
),
sizeAggr
);
if
(
nread
<
0
)
{
if
(
nreadAggr
<
0
)
{
tsdbError
(
"vgId:%d failed to load block statis part while read file %s since %s, offset:%"
PRId64
" len :%"
PRIzu
,
tsdbError
(
"vgId:%d failed to load block aggr part while read file %s since %s, offset:%"
PRIu64
" len :%"
PRIzu
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
tstrerror
(
terrno
),
(
int64_t
)
pBlock
->
offset
,
size
);
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFileAggr
),
tstrerror
(
terrno
),
(
uint64_t
)
pBlock
->
aggrOffset
,
sizeAggr
);
return
-
1
;
return
-
1
;
}
}
if
(
nread
<
size
)
{
if
(
nread
Aggr
<
sizeAggr
)
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d block
statis part in file %s is corrupted, offset:%"
PRId
64
" expected bytes:%"
PRIzu
tsdbError
(
"vgId:%d block
aggr part in file %s is corrupted, offset:%"
PRIu
64
" expected bytes:%"
PRIzu
" read bytes: %"
PRId64
,
" read bytes: %"
PRId64
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
(
int64_t
)
pBlock
->
offset
,
size
,
nread
);
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFileAggr
),
(
uint64_t
)
pBlock
->
aggrOffset
,
sizeAggr
,
nreadAggr
);
return
-
1
;
return
-
1
;
}
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
pReadh
->
p
BlkData
),
(
uint32_t
)
size
))
{
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
pReadh
->
p
AggrBlkData
),
(
uint32_t
)
sizeAggr
))
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d block
statis part in file %s is corrupted since wrong checksum, offset:%"
PRId
64
" len :%"
PRIzu
,
tsdbError
(
"vgId:%d block
aggr part in file %s is corrupted since wrong checksum, offset:%"
PRIu
64
" len :%"
PRIzu
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
(
int64_t
)
pBlock
->
offset
,
size
);
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
Aggr
),
(
uint64_t
)
pBlock
->
aggrOffset
,
sizeAggr
);
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
}
}
...
@@ -375,7 +383,8 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
...
@@ -375,7 +383,8 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
return
buf
;
return
buf
;
}
}
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
)
{
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
,
SBlock
*
pBlock
)
{
#ifdef TD_REFACTOR_3
SBlockData
*
pBlockData
=
pReadh
->
pBlkData
;
SBlockData
*
pBlockData
=
pReadh
->
pBlkData
;
for
(
int
i
=
0
,
j
=
0
;
i
<
numOfCols
;)
{
for
(
int
i
=
0
,
j
=
0
;
i
<
numOfCols
;)
{
...
@@ -401,6 +410,36 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
...
@@ -401,6 +410,36 @@ void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) {
j
++
;
j
++
;
}
}
}
}
#else
if
(
pBlock
->
aggrStat
)
{
SAggrBlkData
*
pAggrBlkData
=
pReadh
->
pAggrBlkData
;
for
(
int
i
=
0
,
j
=
0
;
i
<
numOfCols
;)
{
if
(
j
>=
pBlock
->
numOfCols
)
{
pStatis
[
i
].
numOfNull
=
-
1
;
i
++
;
continue
;
}
SAggrBlkCol
*
pAggrBlkCol
=
((
SAggrBlkCol
*
)(
pAggrBlkData
))
+
j
;
if
(
pStatis
[
i
].
colId
==
pAggrBlkCol
->
colId
)
{
pStatis
[
i
].
sum
=
pAggrBlkCol
->
sum
;
pStatis
[
i
].
max
=
pAggrBlkCol
->
max
;
pStatis
[
i
].
min
=
pAggrBlkCol
->
min
;
pStatis
[
i
].
maxIndex
=
pAggrBlkCol
->
maxIndex
;
pStatis
[
i
].
minIndex
=
pAggrBlkCol
->
minIndex
;
pStatis
[
i
].
numOfNull
=
pAggrBlkCol
->
numOfNull
;
i
++
;
j
++
;
}
else
if
(
pStatis
[
i
].
colId
<
pAggrBlkCol
->
colId
)
{
pStatis
[
i
].
numOfNull
=
-
1
;
i
++
;
}
else
{
j
++
;
}
}
}
#endif
}
}
static
void
tsdbResetReadTable
(
SReadH
*
pReadh
)
{
static
void
tsdbResetReadTable
(
SReadH
*
pReadh
)
{
...
@@ -449,7 +488,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
...
@@ -449,7 +488,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
return
-
1
;
return
-
1
;
}
}
int32_t
tsize
=
TSDB_BLOCK_STATIS_SIZE
(
pBlock
->
numOfCols
);
int32_t
tsize
=
(
int32_t
)
tsdbBlockStatisSize
(
pBlock
->
numOfCols
,
(
uint32_t
)
pBlock
->
blkVer
);
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
TSDB_READ_BUF
(
pReadh
),
tsize
))
{
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
TSDB_READ_BUF
(
pReadh
),
tsize
))
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%"
PRId64
" len :%d"
,
tsdbError
(
"vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%"
PRId64
" len :%d"
,
...
@@ -686,7 +725,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
...
@@ -686,7 +725,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_BUF
(
pReadh
)),
pBlockCol
->
len
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_BUF
(
pReadh
)),
pBlockCol
->
len
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_COMP_BUF
(
pReadh
)),
tsize
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_COMP_BUF
(
pReadh
)),
tsize
)
<
0
)
return
-
1
;
int64_t
offset
=
pBlock
->
offset
+
TSDB_BLOCK_STATIS_SIZE
(
pBlock
->
numOfCols
)
+
tsdbGetBlockColOffset
(
pBlockCol
);
int64_t
offset
=
pBlock
->
offset
+
tsdbBlockStatisSize
(
pBlock
->
numOfCols
,
(
uint32_t
)
pBlock
->
blkVer
)
+
tsdbGetBlockColOffset
(
pBlockCol
);
if
(
tsdbSeekDFile
(
pDFile
,
offset
,
SEEK_SET
)
<
0
)
{
if
(
tsdbSeekDFile
(
pDFile
,
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load block column data while seek file %s to offset %"
PRId64
" since %s"
,
tsdbError
(
"vgId:%d failed to load block column data while seek file %s to offset %"
PRId64
" since %s"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
tstrerror
(
terrno
));
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
tstrerror
(
terrno
));
...
...
source/util/src/terror.c
浏览文件 @
d4c8fed3
...
@@ -345,6 +345,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information t
...
@@ -345,6 +345,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, "Invalid information t
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_AVAIL_DISK
,
"No available disk"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_AVAIL_DISK
,
"No available disk"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_MESSED_MSG
,
"TSDB messed message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_MESSED_MSG
,
"TSDB messed message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_IVLD_TAG_VAL
,
"TSDB invalid tag value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_IVLD_TAG_VAL
,
"TSDB invalid tag value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_CACHE_LAST_ROW
,
"TSDB no cache last row data"
)
// query
// query
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_QHANDLE
,
"Invalid handle"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_QHANDLE
,
"Invalid handle"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录