Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0e7cfbb0
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
0e7cfbb0
编写于
7月 22, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
7月 22, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2728 from taosdata/feature/2.0tsdb
Feature/2.0tsdb
上级
dd703941
946f6cb8
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
422 addition
and
270 deletion
+422
-270
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+1
-1
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+43
-17
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+19
-12
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+17
-14
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+6
-3
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+300
-209
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+21
-9
src/util/inc/tkvstore.h
src/util/inc/tkvstore.h
+7
-4
src/util/src/tkvstore.c
src/util/src/tkvstore.c
+8
-1
未找到文件。
src/common/src/tdataformat.c
浏览文件 @
0e7cfbb0
...
...
@@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols
->
maxPoints
=
maxRows
;
pCols
->
bufSize
=
maxRowSize
*
maxRows
;
pCols
->
buf
=
calloc
(
1
,
pCols
->
bufSize
);
pCols
->
buf
=
malloc
(
pCols
->
bufSize
);
if
(
pCols
->
buf
==
NULL
)
{
free
(
pCols
);
return
NULL
;
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
0e7cfbb0
...
...
@@ -42,6 +42,7 @@ extern int tsdbDebugFlag;
#define TSDB_MAX_TABLE_SCHEMAS 16
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
// Definitions
// ------------------ tsdbMeta.c
...
...
@@ -132,21 +133,30 @@ typedef struct {
// ------------------ tsdbFile.c
extern
const
char
*
tsdbFileSuffix
[];
typedef
enum
{
#ifdef TSDB_IDX
TSDB_FILE_TYPE_IDX
=
0
,
TSDB_FILE_TYPE_HEAD
,
#else
TSDB_FILE_TYPE_HEAD
=
0
,
#endif
TSDB_FILE_TYPE_DATA
,
TSDB_FILE_TYPE_LAST
,
TSDB_FILE_TYPE_MAX
,
#ifdef TSDB_IDX
TSDB_FILE_TYPE_NIDX
,
#endif
TSDB_FILE_TYPE_NHEAD
,
TSDB_FILE_TYPE_NLAST
}
TSDB_FILE_TYPE
;
typedef
struct
{
uint32_t
offset
;
uint32_t
magic
;
uint32_t
len
;
uint64_t
size
;
// total size of the file
uint64_t
tombSize
;
// unused file size
uint32_t
totalBlocks
;
uint32_t
totalSubBlocks
;
uint32_t
offset
;
uint64_t
size
;
// total size of the file
uint64_t
tombSize
;
// unused file size
}
STsdbFileInfo
;
typedef
struct
{
...
...
@@ -197,6 +207,7 @@ typedef struct {
// ------------------ tsdbRWHelper.c
typedef
struct
{
int32_t
tid
;
uint32_t
len
;
uint32_t
offset
;
uint32_t
hasLast
:
2
;
...
...
@@ -220,7 +231,7 @@ typedef struct {
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
checksum
;
// TODO: decide if checksum logic in this file or make it one API
int32_t
tid
;
uint64_t
uid
;
SCompBlock
blocks
[];
}
SCompInfo
;
...
...
@@ -249,14 +260,12 @@ typedef struct {
typedef
enum
{
TSDB_WRITE_HELPER
,
TSDB_READ_HELPER
}
tsdb_rw_helper_t
;
typedef
struct
{
int
fid
;
TSKEY
minKey
;
TSKEY
maxKey
;
// For read/write purpose
SFile
headF
;
SFile
dataF
;
SFile
lastF
;
// For write purpose only
SFileGroup
fGroup
;
#ifdef TSDB_IDX
SFile
nIdxF
;
#endif
SFile
nHeadF
;
SFile
nLastF
;
}
SHelperFile
;
...
...
@@ -264,9 +273,14 @@ typedef struct {
typedef
struct
{
uint64_t
uid
;
int32_t
tid
;
int32_t
sversion
;
}
SHelperTable
;
typedef
struct
{
SCompIdx
*
pIdxArray
;
int
numOfIdx
;
int
curIdx
;
}
SIdxH
;
typedef
struct
{
tsdb_rw_helper_t
type
;
...
...
@@ -274,7 +288,9 @@ typedef struct {
int8_t
state
;
// For file set usage
SHelperFile
files
;
SCompIdx
*
pCompIdx
;
SIdxH
idxH
;
SCompIdx
curCompIdx
;
void
*
pWIdx
;
// For table set usage
SHelperTable
tableInfo
;
SCompInfo
*
pCompInfo
;
...
...
@@ -286,7 +302,6 @@ typedef struct {
void
*
compBuffer
;
// Buffer for temperary compress/decompress purpose
}
SRWHelper
;
// Operations
// ------------------ tsdbMeta.c
#define TABLE_TYPE(t) (t)->type
...
...
@@ -296,6 +311,7 @@ typedef struct {
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
STsdbMeta
*
tsdbNewMeta
(
STsdbCfg
*
pCfg
);
void
tsdbFreeMeta
(
STsdbMeta
*
pMeta
);
...
...
@@ -445,6 +461,16 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
#define helperRepo(h) (h)->pRepo
#define helperState(h) (h)->state
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
#define helperFileId(h) ((h)->files.fGroup.fileId)
#ifdef TSDB_IDX
#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX]))
#define helperNewIdxF(h) (&((h)->files.nIdxF))
#endif
#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
#define helperNewHeadF(h) (&((h)->files.nHeadF))
#define helperNewLastF(h) (&((h)->files.nLastF))
int
tsdbInitReadHelper
(
SRWHelper
*
pHelper
,
STsdbRepo
*
pRepo
);
int
tsdbInitWriteHelper
(
SRWHelper
*
pHelper
,
STsdbRepo
*
pRepo
);
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
0e7cfbb0
...
...
@@ -30,7 +30,11 @@
#include "ttime.h"
#include "tfile.h"
#ifdef TSDB_IDX
const
char
*
tsdbFileSuffix
[]
=
{
".idx"
,
".head"
,
".data"
,
".last"
,
""
,
".i"
,
".h"
,
".l"
};
#else
const
char
*
tsdbFileSuffix
[]
=
{
".head"
,
".data"
,
".last"
,
""
,
".h"
,
".l"
};
#endif
static
int
tsdbInitFile
(
SFile
*
pFile
,
STsdbRepo
*
pRepo
,
int
fid
,
int
type
);
static
void
tsdbDestroyFile
(
SFile
*
pFile
);
...
...
@@ -108,7 +112,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
memset
((
void
*
)(
&
fileGroup
),
0
,
sizeof
(
SFileGroup
));
fileGroup
.
fileId
=
fid
;
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbInitFile
(
&
fileGroup
.
files
[
type
],
pRepo
,
fid
,
type
)
<
0
)
{
tsdbError
(
"vgId:%d failed to init file fid %d type %d"
,
REPO_ID
(
pRepo
),
fid
,
type
);
goto
_err
;
...
...
@@ -126,7 +130,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
return
0
;
_err:
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
fileGroup
.
files
[
type
]);
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
fileGroup
.
files
[
type
]);
tfree
(
tDataDir
);
if
(
dir
!=
NULL
)
closedir
(
dir
);
...
...
@@ -139,7 +143,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
for
(
int
i
=
0
;
i
<
pFileH
->
nFGroups
;
i
++
)
{
SFileGroup
*
pFGroup
=
pFileH
->
pFGroup
+
i
;
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
tsdbDestroyFile
(
&
pFGroup
->
files
[
type
]);
}
}
...
...
@@ -156,7 +160,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
SFileGroup
*
pGroup
=
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
);
if
(
pGroup
==
NULL
)
{
// if not exists, create one
pFGroup
->
fileId
=
fid
;
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
&
pFGroup
->
files
[
type
],
pRepo
,
fid
,
type
)
<
0
)
goto
_err
;
}
...
...
@@ -169,7 +173,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
return
pGroup
;
_err:
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
pGroup
->
files
[
type
]);
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
pGroup
->
files
[
type
]);
return
NULL
;
}
...
...
@@ -260,6 +264,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
}
pFile
->
info
.
size
=
TSDB_FILE_HEAD_SIZE
;
pFile
->
info
.
magic
=
TSDB_FILE_INIT_MAGIC
;
if
(
tsdbUpdateFileHeader
(
pFile
,
0
)
<
0
)
{
tsdbCloseFile
(
pFile
);
...
...
@@ -323,23 +328,25 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
int
tsdbEncodeSFileInfo
(
void
**
buf
,
const
STsdbFileInfo
*
pInfo
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
offset
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
len
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
size
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
tombSize
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalSubBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
offset
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
size
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
tombSize
);
return
tlen
;
}
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
)
{
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
offset
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
len
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
size
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
tombSize
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalSubBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
offset
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
size
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
tombSize
));
return
buf
;
}
...
...
@@ -358,7 +365,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
pFileH
->
nFGroups
--
;
ASSERT
(
pFileH
->
nFGroups
>=
0
);
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
remove
(
fileGroup
.
files
[
type
].
fname
)
<
0
)
{
tsdbError
(
"vgId:%d failed to remove file %s"
,
REPO_ID
(
pRepo
),
fileGroup
.
files
[
type
].
fname
);
}
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
0e7cfbb0
...
...
@@ -212,59 +212,61 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
char
*
sdup
=
strdup
(
pRepo
->
rootDir
);
char
*
prefix
=
dirname
(
sdup
);
int
prefixLen
=
strlen
(
prefix
);
tfree
(
sdup
);
if
(
name
[
0
]
==
0
)
{
// get the file from index or after, but not larger than eindex
int
fid
=
(
*
index
)
/
3
;
int
fid
=
(
*
index
)
/
TSDB_FILE_TYPE_MAX
;
if
(
pFileH
->
nFGroups
==
0
||
fid
>
pFileH
->
pFGroup
[
pFileH
->
nFGroups
-
1
].
fileId
)
{
if
(
*
index
<=
TSDB_META_FILE_INDEX
&&
TSDB_META_FILE_INDEX
<=
eindex
)
{
fname
=
tsdbGetMetaFileName
(
pRepo
->
rootDir
);
*
index
=
TSDB_META_FILE_INDEX
;
magic
=
TSDB_META_FILE_MAGIC
(
pRepo
->
tsdbMeta
);
}
else
{
tfree
(
sdup
);
return
0
;
}
}
else
{
SFileGroup
*
pFGroup
=
taosbsearch
(
&
fid
,
pFileH
->
pFGroup
,
pFileH
->
nFGroups
,
sizeof
(
SFileGroup
),
keyFGroupCompFunc
,
TD_GE
);
if
(
pFGroup
->
fileId
==
fid
)
{
fname
=
strdup
(
pFGroup
->
files
[(
*
index
)
%
3
].
fname
);
fname
=
strdup
(
pFGroup
->
files
[(
*
index
)
%
TSDB_FILE_TYPE_MAX
].
fname
);
magic
=
pFGroup
->
files
[(
*
index
)
%
TSDB_FILE_TYPE_MAX
].
info
.
magic
;
}
else
{
if
(
pFGroup
->
fileId
*
3
+
2
<
eindex
)
{
if
(
(
pFGroup
->
fileId
+
1
)
*
TSDB_FILE_TYPE_MAX
-
1
<
eindex
)
{
fname
=
strdup
(
pFGroup
->
files
[
0
].
fname
);
*
index
=
pFGroup
->
fileId
*
3
;
*
index
=
pFGroup
->
fileId
*
TSDB_FILE_TYPE_MAX
;
magic
=
pFGroup
->
files
[
0
].
info
.
magic
;
}
else
{
tfree
(
sdup
);
return
0
;
}
}
}
strcpy
(
name
,
fname
+
strlen
(
prefix
)
);
strcpy
(
name
,
fname
+
prefixLen
);
}
else
{
// get the named file at the specified index. If not there, return 0
if
(
*
index
==
TSDB_META_FILE_INDEX
)
{
// get meta file
fname
=
tsdbGetMetaFileName
(
pRepo
->
rootDir
);
magic
=
TSDB_META_FILE_MAGIC
(
pRepo
->
tsdbMeta
);
}
else
{
int
fid
=
(
*
index
)
/
3
;
int
fid
=
(
*
index
)
/
TSDB_FILE_TYPE_MAX
;
SFileGroup
*
pFGroup
=
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
);
if
(
pFGroup
==
NULL
)
{
// not found
tfree
(
sdup
);
return
0
;
}
SFile
*
pFile
=
&
pFGroup
->
files
[(
*
index
)
%
3
];
SFile
*
pFile
=
&
pFGroup
->
files
[(
*
index
)
%
TSDB_FILE_TYPE_MAX
];
fname
=
strdup
(
pFile
->
fname
);
magic
=
pFile
->
info
.
magic
;
}
}
if
(
stat
(
fname
,
&
fState
)
<
0
)
{
tfree
(
sdup
);
tfree
(
fname
);
return
0
;
}
tfree
(
sdup
);
*
size
=
fState
.
st_size
;
magic
=
*
size
;
//
magic = *size;
tfree
(
fname
);
return
magic
;
...
...
@@ -793,7 +795,8 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
for
(
int
i
=
1
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
==
NULL
)
continue
;
SCompIdx
*
pIdx
=
&
rhelper
.
pCompIdx
[
i
];
tsdbSetHelperTable
(
&
rhelper
,
pTable
,
pRepo
);
SCompIdx
*
pIdx
=
&
(
rhelper
.
curCompIdx
);
if
(
pIdx
->
offset
>
0
&&
pTable
->
lastKey
<
pIdx
->
maxKey
)
pTable
->
lastKey
=
pIdx
->
maxKey
;
}
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
0e7cfbb0
...
...
@@ -627,9 +627,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
tsdbCloseHelperFile
(
pHelper
,
0
);
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
]
=
pHelper
->
files
.
headF
;
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
]
=
pHelper
->
files
.
dataF
;
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
]
=
pHelper
->
files
.
lastF
;
#ifdef TSDB_IDX
pGroup
->
files
[
TSDB_FILE_TYPE_IDX
]
=
*
(
helperIdxF
(
pHelper
));
#endif
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
]
=
*
(
helperHeadF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
]
=
*
(
helperDataF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
]
=
*
(
helperLastF
(
pHelper
));
pthread_rwlock_unlock
(
&
(
pFileH
->
fhlock
));
return
0
;
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
0e7cfbb0
...
...
@@ -99,6 +99,7 @@ void tsdbResetHelper(SRWHelper *pHelper) {
int
tsdbSetAndOpenHelperFile
(
SRWHelper
*
pHelper
,
SFileGroup
*
pGroup
)
{
ASSERT
(
pHelper
!=
NULL
&&
pGroup
!=
NULL
);
SFile
*
pFile
=
NULL
;
// Clear the helper object
tsdbResetHelper
(
pHelper
);
...
...
@@ -106,44 +107,51 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
ASSERT
(
pHelper
->
state
==
TSDB_HELPER_CLEAR_STATE
);
// Set the files
pHelper
->
files
.
fid
=
pGroup
->
fileId
;
pHelper
->
files
.
headF
=
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
];
pHelper
->
files
.
dataF
=
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
];
pHelper
->
files
.
lastF
=
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
];
pHelper
->
files
.
fGroup
=
*
pGroup
;
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NHEAD
,
pHelper
->
files
.
nHeadF
.
fname
);
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NLAST
,
pHelper
->
files
.
nLastF
.
fname
);
#ifdef TSDB_IDX
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NIDX
,
helperNewIdxF
(
pHelper
)
->
fname
);
#endif
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NHEAD
,
helperNewHeadF
(
pHelper
)
->
fname
);
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NLAST
,
helperNewLastF
(
pHelper
)
->
fname
);
}
// Open the files
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
headF
),
O_RDONLY
)
<
0
)
goto
_err
;
#ifdef TSDB_IDX
if
(
tsdbOpenFile
(
helperIdxF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
#endif
if
(
tsdbOpenFile
(
helperHeadF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
dataF
),
O_RDWR
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
lastF
),
O_RDWR
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperDataF
(
pHelper
),
O_RDWR
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperLastF
(
pHelper
),
O_RDWR
)
<
0
)
goto
_err
;
#ifdef TSDB_IDX
// Create and open .i file
pFile
=
helperNewIdxF
(
pHelper
);
if
(
tsdbOpenFile
(
pFile
,
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
pFile
->
info
.
size
=
TSDB_FILE_HEAD_SIZE
;
pFile
->
info
.
magic
=
TSDB_FILE_INIT_MAGIC
;
if
(
tsdbUpdateFileHeader
(
pFile
,
0
)
<
0
)
return
-
1
;
#endif
// Create and open .h
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
nHeadF
),
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
// size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
tsdbError
(
"vgId:%d failed to sendfile %d bytes from file %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
TSDB_FILE_HEAD_SIZE
,
pHelper
->
files
.
headF
.
fname
,
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pFile
=
helperNewHeadF
(
pHelper
);
if
(
tsdbOpenFile
(
pFile
,
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
pFile
->
info
.
size
=
TSDB_FILE_HEAD_SIZE
;
pFile
->
info
.
magic
=
TSDB_FILE_INIT_MAGIC
;
if
(
tsdbUpdateFileHeader
(
pFile
,
0
)
<
0
)
return
-
1
;
// Create and open .l file if should
if
(
tsdbShouldCreateNewLast
(
pHelper
))
{
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
nLastF
),
O_WRONLY
|
O_CREAT
)
<
0
)
goto
_err
;
if
(
tsendfile
(
pHelper
->
files
.
nLastF
.
fd
,
pHelper
->
files
.
lastF
.
fd
,
NULL
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
tsdbError
(
"vgId:%d failed to sendfile %d bytes from file %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
TSDB_FILE_HEAD_SIZE
,
pHelper
->
files
.
lastF
.
fname
,
pHelper
->
files
.
nLastF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
pFile
=
helperNewLastF
(
pHelper
);
if
(
tsdbOpenFile
(
pFile
,
O_WRONLY
|
O_CREAT
)
<
0
)
goto
_err
;
pFile
->
info
.
size
=
TSDB_FILE_HEAD_SIZE
;
pFile
->
info
.
magic
=
TSDB_FILE_INIT_MAGIC
;
if
(
tsdbUpdateFileHeader
(
pFile
,
0
)
<
0
)
return
-
1
;
}
}
else
{
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
dataF
),
O_RDONLY
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
&
(
pHelper
->
files
.
lastF
),
O_RDONLY
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperDataF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperLastF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
}
helperSetState
(
pHelper
,
TSDB_HELPER_FILE_SET_AND_OPEN
);
...
...
@@ -155,59 +163,98 @@ _err:
}
int
tsdbCloseHelperFile
(
SRWHelper
*
pHelper
,
bool
hasError
)
{
if
(
pHelper
->
files
.
headF
.
fd
>
0
)
{
close
(
pHelper
->
files
.
headF
.
fd
);
pHelper
->
files
.
headF
.
fd
=
-
1
;
SFile
*
pFile
=
NULL
;
#ifdef TSDB_IDX
pFile
=
helperIdxF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
}
#endif
pFile
=
helperHeadF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
}
if
(
pHelper
->
files
.
dataF
.
fd
>
0
)
{
pFile
=
helperDataF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
tsdbUpdateFileHeader
(
&
(
pHelper
->
files
.
dataF
)
,
0
);
fsync
(
p
Helper
->
files
.
dataF
.
fd
);
tsdbUpdateFileHeader
(
pFile
,
0
);
fsync
(
p
File
->
fd
);
}
close
(
p
Helper
->
files
.
dataF
.
fd
);
p
Helper
->
files
.
dataF
.
fd
=
-
1
;
close
(
p
File
->
fd
);
p
File
->
fd
=
-
1
;
}
if
(
pHelper
->
files
.
lastF
.
fd
>
0
)
{
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
fsync
(
pHelper
->
files
.
lastF
.
fd
);
pFile
=
helperLastF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
&&
!
TSDB_NLAST_FILE_OPENED
(
pHelper
))
{
fsync
(
pFile
->
fd
);
}
close
(
p
Helper
->
files
.
lastF
.
fd
);
p
Helper
->
files
.
lastF
.
fd
=
-
1
;
close
(
p
File
->
fd
);
p
File
->
fd
=
-
1
;
}
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
if
(
pHelper
->
files
.
nHeadF
.
fd
>
0
)
{
if
(
!
hasError
)
tsdbUpdateFileHeader
(
&
(
pHelper
->
files
.
nHeadF
),
0
);
fsync
(
pHelper
->
files
.
nHeadF
.
fd
);
close
(
pHelper
->
files
.
nHeadF
.
fd
);
pHelper
->
files
.
nHeadF
.
fd
=
-
1
;
#ifdef TSDB_IDX
pFile
=
helperNewIdxF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
!
hasError
)
tsdbUpdateFileHeader
(
pFile
,
0
);
fsync
(
pFile
->
fd
);
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
if
(
hasError
)
{
(
void
)
remove
(
pFile
->
fname
);
}
else
{
if
(
rename
(
pFile
->
fname
,
helperIdxF
(
pHelper
)
->
fname
)
<
0
)
{
tsdbError
(
"failed to rename file from %s to %s since %s"
,
pFile
->
fname
,
helperIdxF
(
pHelper
)
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
helperIdxF
(
pHelper
)
->
info
=
pFile
->
info
;
}
}
#endif
pFile
=
helperNewHeadF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
!
hasError
)
tsdbUpdateFileHeader
(
pFile
,
0
);
fsync
(
pFile
->
fd
);
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
if
(
hasError
)
{
(
void
)
remove
(
p
Helper
->
files
.
nHeadF
.
fname
);
(
void
)
remove
(
p
File
->
fname
);
}
else
{
if
(
rename
(
p
Helper
->
files
.
nHeadF
.
fname
,
pHelper
->
files
.
headF
.
fname
)
<
0
)
{
tsdbError
(
"failed to rename file from %s to %s since %s"
,
p
Helper
->
files
.
nHeadF
.
fname
,
pHelper
->
files
.
headF
.
fname
,
strerror
(
errno
));
if
(
rename
(
p
File
->
fname
,
helperHeadF
(
pHelper
)
->
fname
)
<
0
)
{
tsdbError
(
"failed to rename file from %s to %s since %s"
,
p
File
->
fname
,
helperHeadF
(
pHelper
)
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pHelper
->
files
.
headF
.
info
=
pHelper
->
files
.
nHeadF
.
info
;
helperHeadF
(
pHelper
)
->
info
=
pFile
->
info
;
}
}
if
(
pHelper
->
files
.
nLastF
.
fd
>
0
)
{
if
(
!
hasError
)
tsdbUpdateFileHeader
(
&
(
pHelper
->
files
.
nLastF
),
0
);
fsync
(
pHelper
->
files
.
nLastF
.
fd
);
close
(
pHelper
->
files
.
nLastF
.
fd
);
pHelper
->
files
.
nLastF
.
fd
=
-
1
;
pFile
=
helperNewLastF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
!
hasError
)
tsdbUpdateFileHeader
(
pFile
,
0
);
fsync
(
pFile
->
fd
);
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
if
(
hasError
)
{
(
void
)
remove
(
p
Helper
->
files
.
nLastF
.
fname
);
(
void
)
remove
(
p
File
->
fname
);
}
else
{
if
(
rename
(
p
Helper
->
files
.
nLastF
.
fname
,
pHelper
->
files
.
lastF
.
fname
)
<
0
)
{
tsdbError
(
"failed to rename file from %s to %s since %s"
,
p
Helper
->
files
.
nLastF
.
fname
,
pHelper
->
files
.
lastF
.
fname
,
strerror
(
errno
));
if
(
rename
(
p
File
->
fname
,
helperLastF
(
pHelper
)
->
fname
)
<
0
)
{
tsdbError
(
"failed to rename file from %s to %s since %s"
,
p
File
->
fname
,
helperLastF
(
pHelper
)
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pHelper
->
files
.
lastF
.
info
=
pHelper
->
files
.
nLastF
.
info
;
helperLastF
(
pHelper
)
->
info
=
helperNewLastF
(
pHelper
)
->
info
;
}
}
}
...
...
@@ -224,18 +271,35 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
pHelper
->
tableInfo
.
tid
=
pTable
->
tableId
.
tid
;
pHelper
->
tableInfo
.
uid
=
pTable
->
tableId
.
uid
;
STSchema
*
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
);
pHelper
->
tableInfo
.
sversion
=
schemaVersion
(
pSchema
);
tdInitDataCols
(
pHelper
->
pDataCols
[
0
],
pSchema
);
tdInitDataCols
(
pHelper
->
pDataCols
[
1
],
pSchema
);
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pTable
->
tableId
.
tid
;
if
(
pIdx
->
offset
>
0
)
{
if
(
pIdx
->
uid
!=
TABLE_UID
(
pTable
))
{
memset
((
void
*
)
pIdx
,
0
,
sizeof
(
SCompIdx
));
if
(
pHelper
->
idxH
.
numOfIdx
>
0
)
{
while
(
true
)
{
if
(
pHelper
->
idxH
.
curIdx
>=
pHelper
->
idxH
.
numOfIdx
)
{
memset
(
&
(
pHelper
->
curCompIdx
),
0
,
sizeof
(
SCompIdx
));
break
;
}
SCompIdx
*
pIdx
=
&
(
pHelper
->
idxH
.
pIdxArray
[
pHelper
->
idxH
.
curIdx
]);
if
(
pIdx
->
tid
==
TABLE_TID
(
pTable
))
{
if
(
pIdx
->
uid
==
TABLE_UID
(
pTable
))
{
pHelper
->
curCompIdx
=
*
pIdx
;
}
else
{
if
(
pIdx
->
hasLast
)
pHelper
->
hasOldLastBlock
=
true
;
memset
(
&
(
pHelper
->
curCompIdx
),
0
,
sizeof
(
SCompIdx
))
;
}
pHelper
->
idxH
.
curIdx
++
;
break
;
}
else
if
(
pIdx
->
tid
>
TABLE_TID
(
pTable
))
{
memset
(
&
(
pHelper
->
curCompIdx
),
0
,
sizeof
(
SCompIdx
));
break
;
}
else
{
pHelper
->
idxH
.
curIdx
++
;
}
}
}
else
{
memset
(
&
(
pHelper
->
curCompIdx
),
0
,
sizeof
(
SCompIdx
));
}
helperSetState
(
pHelper
,
TSDB_HELPER_TABLE_SET
);
...
...
@@ -245,7 +309,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
int
tsdbCommitTableData
(
SRWHelper
*
pHelper
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pDataCols
,
TSKEY
maxKey
)
{
ASSERT
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
);
SCompIdx
*
pIdx
=
&
(
pHelper
->
pCompIdx
[
TABLE_TID
(
pCommitIter
->
pTable
)]
);
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
int
blkIdx
=
0
;
ASSERT
(
pIdx
->
offset
==
0
||
pIdx
->
uid
==
TABLE_UID
(
pCommitIter
->
pTable
));
...
...
@@ -271,44 +335,53 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
STsdbCfg
*
pCfg
=
&
pHelper
->
pRepo
->
config
;
ASSERT
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
);
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
)
;
SCompBlock
compBlock
=
{
0
};
if
(
TSDB_NLAST_FILE_OPENED
(
pHelper
)
&&
(
pHelper
->
hasOldLastBlock
))
{
if
(
tsdbLoadCompInfo
(
pHelper
,
NULL
)
<
0
)
return
-
1
;
SCompBlock
*
pCompBlock
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
);
ASSERT
(
pCompBlock
->
last
);
if
(
tsdbLoadBlockData
(
pHelper
,
pCompBlock
,
NULL
)
<
0
)
return
-
1
;
ASSERT
(
pHelper
->
pDataCols
[
0
]
->
numOfRows
==
pCompBlock
->
numOfRows
&&
pHelper
->
pDataCols
[
0
]
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperNewLastF
(
pHelper
),
pHelper
->
pDataCols
[
0
],
&
compBlock
,
true
,
true
)
<
0
)
return
-
1
;
if
(
tsdbUpdateSuperBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
-
1
)
<
0
)
return
-
1
;
#if 0
if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
nLastF
),
pHelper
->
pDataCols
[
0
],
&
compBlock
,
true
,
true
)
<
0
)
if (tsdbWriteBlockToFile(pHelper,
helperNewLastF(pHelper
), pHelper->pDataCols[0], &compBlock, true, true) < 0)
return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
} else {
if
(
lseek
(
pHelper
->
files
.
lastF
.
fd
,
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
lastF
.
fname
,
if (lseek(
helperLastF(pHelper)->
fd, pCompBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo),
helperLastF(pHelper)->
fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pCompBlock
->
offset
=
lseek
(
pHelper
->
files
.
nLastF
.
fd
,
0
,
SEEK_END
);
pCompBlock->offset = lseek(
helperNewLastF(pHelper)->
fd, 0, SEEK_END);
if (pCompBlock->offset < 0) {
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nLastF
.
fname
,
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo),
helperNewLastF(pHelper)->
fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if
(
tsendfile
(
pHelper
->
files
.
nLastF
.
fd
,
pHelper
->
files
.
lastF
.
fd
,
NULL
,
pCompBlock
->
len
)
<
pCompBlock
->
len
)
{
if (tsendfile(
helperNewLastF(pHelper)->fd, helperLastF(pHelper)->
fd, NULL, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
pHelper
->
files
.
lastF
.
fname
,
pHelper
->
files
.
nLastF
.
fname
,
strerror
(
errno
));
helperLastF(pHelper)->fname, helperNewLastF(pHelper)->
fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
#endif
pHelper
->
hasOldLastBlock
=
false
;
}
...
...
@@ -317,164 +390,178 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
}
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
)
{
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
off_t
offset
=
0
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
pIdx
->
offset
>
0
)
{
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseed file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
SFile
*
pFile
=
helperNewHeadF
(
pHelper
);
pIdx
->
offset
=
offset
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
tsendfile
(
pHelper
->
files
.
nHeadF
.
fd
,
pHelper
->
files
.
headF
.
fd
,
NULL
,
pIdx
->
len
)
<
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to send %d bytes from file %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
pHelper
->
files
.
headF
.
fname
,
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
}
else
{
if
(
pIdx
->
len
>
0
)
{
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
tsdbLoadCompInfo
(
pHelper
,
NULL
)
<
0
)
return
-
1
;
}
else
{
pHelper
->
pCompInfo
->
delimiter
=
TSDB_FILE_DELIMITER
;
pHelper
->
pCompInfo
->
uid
=
pHelper
->
tableInfo
.
uid
;
pHelper
->
pCompInfo
->
checksum
=
0
;
pHelper
->
pCompInfo
->
tid
=
pHelper
->
tableInfo
.
tid
;
ASSERT
(
pIdx
->
len
>
sizeof
(
SCompInfo
)
+
sizeof
(
TSCKSUM
)
&&
(
pIdx
->
len
-
sizeof
(
SCompInfo
)
-
sizeof
(
TSCKSUM
))
%
sizeof
(
SCompBlock
)
==
0
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
}
pFile
->
info
.
magic
=
taosCalcChecksum
(
pFile
->
info
.
magic
,
(
uint8_t
*
)
POINTER_SHIFT
(
pHelper
->
pCompInfo
,
pIdx
->
len
-
sizeof
(
TSCKSUM
)),
sizeof
(
TSCKSUM
));
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pIdx
->
offset
=
offset
;
pIdx
->
uid
=
pHelper
->
tableInfo
.
uid
;
pIdx
->
tid
=
pHelper
->
tableInfo
.
tid
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
twrite
(
pHelper
->
files
.
nHeadF
.
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
if
(
twrite
(
pFile
->
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
#ifdef TSDB_IDX
pFile
=
helperNewIdxF
(
pHelper
);
#endif
if
(
tsizeof
(
pHelper
->
pWIdx
)
<
pFile
->
info
.
len
+
sizeof
(
SCompIdx
)
+
12
)
{
pHelper
->
pWIdx
=
trealloc
(
pHelper
->
pWIdx
,
tsizeof
(
pHelper
->
pWIdx
)
==
0
?
1024
:
tsizeof
(
pHelper
->
pWIdx
)
*
2
);
if
(
pHelper
->
pWIdx
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
}
void
*
pBuf
=
POINTER_SHIFT
(
pHelper
->
pWIdx
,
pFile
->
info
.
len
);
pFile
->
info
.
len
+=
tsdbEncodeSCompIdx
(
&
pBuf
,
&
(
pHelper
->
curCompIdx
));
}
return
0
;
}
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
)
{
STsdbCfg
*
pCfg
=
&
pHelper
->
pRepo
->
config
;
ASSERT
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
);
off_t
offset
=
lseek
(
pHelper
->
files
.
nHeadF
.
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s to end since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
off_t
offset
=
0
;
SFile
*
pFile
=
&
(
pHelper
->
files
.
nHeadF
);
pFile
->
info
.
offset
=
offset
;
#ifdef TSDB_IDX
SFile
*
pFile
=
helperNewIdxF
(
pHelper
);
#else
SFile
*
pFile
=
helperNewHeadF
(
pHelper
);
#endif
void
*
buf
=
pHelper
->
pBuffer
;
for
(
uint32_t
i
=
0
;
i
<
pCfg
->
maxTables
;
i
++
)
{
SCompIdx
*
pCompIdx
=
pHelper
->
pCompIdx
+
i
;
if
(
pCompIdx
->
offset
>
0
)
{
int
drift
=
POINTER_DISTANCE
(
buf
,
pHelper
->
pBuffer
);
if
(
tsizeof
(
pHelper
->
pBuffer
)
-
drift
<
128
)
{
pHelper
->
pBuffer
=
trealloc
(
pHelper
->
pBuffer
,
tsizeof
(
pHelper
->
pBuffer
)
*
2
);
if
(
pHelper
->
pBuffer
==
NULL
)
{
pFile
->
info
.
len
+=
sizeof
(
TSCKSUM
);
if
(
tsizeof
(
pHelper
->
pWIdx
)
<
pFile
->
info
.
len
)
{
pHelper
->
pWIdx
=
trealloc
(
pHelper
->
pWIdx
,
pFile
->
info
.
len
);
if
(
pHelper
->
pWIdx
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
}
buf
=
POINTER_SHIFT
(
pHelper
->
pBuffer
,
drift
);
taosEncodeVariantU32
(
&
buf
,
i
);
tsdbEncodeSCompIdx
(
&
buf
,
pCompIdx
);
}
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
);
pFile
->
info
.
magic
=
taosCalcChecksum
(
pFile
->
info
.
magic
,
(
uint8_t
*
)
POINTER_SHIFT
(
pHelper
->
pWIdx
,
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
)),
sizeof
(
TSCKSUM
));
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
int
tsize
=
(
char
*
)
buf
-
(
char
*
)
pHelper
->
pBuffer
+
sizeof
(
TSCKSUM
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pBuffer
,
tsize
);
pFile
->
info
.
offset
=
offset
;
if
(
twrite
(
p
Helper
->
files
.
nHeadF
.
fd
,
(
void
*
)
pHelper
->
pBuffer
,
tsize
)
<
tsize
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
tsize
,
p
Helper
->
files
.
nHeadF
.
fname
,
strerror
(
errno
));
if
(
twrite
(
p
File
->
fd
,
(
void
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
)
<
pFile
->
info
.
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
info
.
len
,
p
File
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pFile
->
info
.
len
=
tsize
;
return
0
;
}
int
tsdbLoadCompIdx
(
SRWHelper
*
pHelper
,
void
*
target
)
{
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
ASSERT
(
pHelper
->
state
==
TSDB_HELPER_FILE_SET_AND_OPEN
);
#ifdef TSDB_IDX
SFile
*
pFile
=
helperIdxF
(
pHelper
);
#else
SFile
*
pFile
=
helperHeadF
(
pHelper
);
#endif
int
fd
=
pFile
->
fd
;
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
))
{
// If not load from file, just load it in object
SFile
*
pFile
=
&
(
pHelper
->
files
.
headF
);
int
fd
=
pFile
->
fd
;
memset
(
pHelper
->
pCompIdx
,
0
,
tsizeof
(
pHelper
->
pCompIdx
));
if
(
pFile
->
info
.
offset
>
0
)
{
ASSERT
(
pFile
->
info
.
offset
>
TSDB_FILE_HEAD_SIZE
);
if
(
pFile
->
info
.
len
>
0
)
{
if
((
pHelper
->
pBuffer
=
trealloc
(
pHelper
->
pBuffer
,
pFile
->
info
.
len
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
lseek
(
fd
,
pFile
->
info
.
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s to %u since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
pFile
->
info
.
offset
,
strerror
(
errno
));
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
if
((
pHelper
->
pBuffer
=
trealloc
(
pHelper
->
pBuffer
,
pFile
->
info
.
len
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
tread
(
fd
,
(
void
*
)(
pHelper
->
pBuffer
),
pFile
->
info
.
len
)
<
pFile
->
info
.
len
)
{
tsdbError
(
"vgId:%d failed to read %d bytes from file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
info
.
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
pHelper
->
pBuffer
),
pFile
->
info
.
len
))
{
tsdbError
(
"vgId:%d file %s SCompIdx part is corrupted.
offset %u
len %u"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
pFile
->
info
.
offset
,
pFile
->
info
.
len
);
tsdbError
(
"vgId:%d file %s SCompIdx part is corrupted. len %u"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
pFile
->
info
.
len
);
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
return
-
1
;
}
// Decode it
pHelper
->
idxH
.
numOfIdx
=
0
;
void
*
ptr
=
pHelper
->
pBuffer
;
while
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<
(
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
)))
{
uint32_t
tid
=
0
;
if
((
ptr
=
taosDecodeVariantU32
(
ptr
,
&
tid
))
==
NULL
)
return
-
1
;
ASSERT
(
tid
>
0
&&
tid
<
pCfg
->
maxTables
);
size_t
tlen
=
tsizeof
(
pHelper
->
idxH
.
pIdxArray
);
pHelper
->
idxH
.
numOfIdx
++
;
if
((
ptr
=
tsdbDecodeSCompIdx
(
ptr
,
pHelper
->
pCompIdx
+
tid
))
==
NULL
)
return
-
1
;
ASSERT
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<=
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
));
if
(
tlen
<
pHelper
->
idxH
.
numOfIdx
*
sizeof
(
SCompIdx
))
{
pHelper
->
idxH
.
pIdxArray
=
(
SCompIdx
*
)
trealloc
(
pHelper
->
idxH
.
pIdxArray
,
(
tlen
==
0
)
?
1024
:
tlen
*
2
);
if
(
pHelper
->
idxH
.
pIdxArray
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
}
if
(
lseek
(
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
ptr
=
tsdbDecodeSCompIdx
(
ptr
,
&
(
pHelper
->
idxH
.
pIdxArray
[
pHelper
->
idxH
.
numOfIdx
-
1
]));
if
(
ptr
==
NULL
)
{
tsdbError
(
"vgId:%d file %s SCompIdx part is corrupted. len %u"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
pFile
->
info
.
len
);
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
return
-
1
;
}
ASSERT
(
pHelper
->
idxH
.
numOfIdx
==
1
||
pHelper
->
idxH
.
pIdxArray
[
pHelper
->
idxH
.
numOfIdx
-
1
].
tid
>
pHelper
->
idxH
.
pIdxArray
[
pHelper
->
idxH
.
numOfIdx
-
2
].
tid
);
ASSERT
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<=
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
));
}
}
}
helperSetState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
);
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
pFile
->
info
.
len
=
0
;
}
// Copy the memory for outside usage
if
(
target
)
memcpy
(
target
,
pHelper
->
pCompIdx
,
tsizeof
(
pHelper
->
pCompIdx
));
if
(
target
&&
pHelper
->
idxH
.
numOfIdx
>
0
)
memcpy
(
target
,
pHelper
->
idxH
.
pIdxArray
,
sizeof
(
SCompIdx
)
*
pHelper
->
idxH
.
numOfIdx
);
return
0
;
}
...
...
@@ -482,15 +569,15 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
int
tsdbLoadCompInfo
(
SRWHelper
*
pHelper
,
void
*
target
)
{
ASSERT
(
helperHasState
(
pHelper
,
TSDB_HELPER_TABLE_SET
));
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
)
;
int
fd
=
pHelper
->
files
.
headF
.
fd
;
int
fd
=
helperHeadF
(
pHelper
)
->
fd
;
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
pIdx
->
offset
>
0
)
{
ASSERT
(
pIdx
->
uid
==
pHelper
->
tableInfo
.
uid
);
if
(
lseek
(
fd
,
pIdx
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
headF
.
fname
,
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
helperHeadF
(
pHelper
)
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -499,18 +586,18 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
pHelper
->
pCompInfo
=
trealloc
((
void
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
if
(
tread
(
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to read %d bytes from file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
pHelper
->
files
.
headF
.
fname
,
strerror
(
errno
));
helperHeadF
(
pHelper
)
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
))
{
tsdbError
(
"vgId:%d file %s SCompInfo part is corrupted, tid %d uid %"
PRIu64
,
REPO_ID
(
pHelper
->
pRepo
),
pHelper
->
files
.
headF
.
fname
,
pHelper
->
tableInfo
.
tid
,
pHelper
->
tableInfo
.
uid
);
helperHeadF
(
pHelper
)
->
fname
,
pHelper
->
tableInfo
.
tid
,
pHelper
->
tableInfo
.
uid
);
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
return
-
1
;
}
ASSERT
(
pIdx
->
uid
==
pHelper
->
pCompInfo
->
uid
);
ASSERT
(
pIdx
->
uid
==
pHelper
->
pCompInfo
->
uid
&&
pIdx
->
tid
==
pHelper
->
pCompInfo
->
tid
);
}
helperSetState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
);
...
...
@@ -523,7 +610,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
int
tsdbLoadCompData
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
void
*
target
)
{
ASSERT
(
pCompBlock
->
numOfSubBlocks
<=
1
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
&
(
pHelper
->
files
.
lastF
)
:
&
(
pHelper
->
files
.
dataF
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
helperLastF
(
pHelper
)
:
helperDataF
(
pHelper
);
if
(
lseek
(
pFile
->
fd
,
pCompBlock
->
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
...
...
@@ -642,9 +729,9 @@ _err:
// ---------------------- INTERNAL FUNCTIONS ----------------------
static
bool
tsdbShouldCreateNewLast
(
SRWHelper
*
pHelper
)
{
ASSERT
(
pHelper
->
files
.
lastF
.
fd
>
0
);
ASSERT
(
helperLastF
(
pHelper
)
->
fd
>
0
);
struct
stat
st
;
if
(
fstat
(
pHelper
->
files
.
lastF
.
fd
,
&
st
)
<
0
)
return
true
;
if
(
fstat
(
helperLastF
(
pHelper
)
->
fd
,
&
st
)
<
0
)
return
true
;
if
(
st
.
st_size
>
32
*
1024
+
TSDB_FILE_HEAD_SIZE
)
return
true
;
return
false
;
}
...
...
@@ -729,6 +816,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
ASSERT
(
flen
>
0
);
flen
+=
sizeof
(
TSCKSUM
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
tptr
,
flen
);
pFile
->
info
.
magic
=
taosCalcChecksum
(
pFile
->
info
.
magic
,
(
uint8_t
*
)
POINTER_SHIFT
(
tptr
,
flen
-
sizeof
(
TSCKSUM
)),
sizeof
(
TSCKSUM
));
if
(
ncol
!=
0
)
{
pCompCol
->
offset
=
toffset
;
...
...
@@ -747,6 +836,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
pCompData
->
numOfCols
=
nColsNotAllNull
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pCompData
,
tsize
);
pFile
->
info
.
magic
=
taosCalcChecksum
(
pFile
->
info
.
magic
,
(
uint8_t
*
)
POINTER_SHIFT
(
pCompData
,
tsize
-
sizeof
(
TSCKSUM
)),
sizeof
(
TSCKSUM
));
// Write the whole block to file
if
(
twrite
(
pFile
->
fd
,
(
void
*
)
pCompData
,
lsize
)
<
lsize
)
{
...
...
@@ -804,7 +895,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
}
static
int
tsdbInsertSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
)
{
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
)
;
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<=
pIdx
->
numOfBlocks
);
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
1
);
...
...
@@ -851,7 +942,7 @@ _err:
static
int
tsdbAddSubBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
,
int
rowsAdded
)
{
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
0
);
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
)
;
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<
pIdx
->
numOfBlocks
);
SCompBlock
*
pSCompBlock
=
pHelper
->
pCompInfo
->
blocks
+
blkIdx
;
...
...
@@ -935,7 +1026,7 @@ _err:
static
int
tsdbUpdateSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
)
{
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
1
);
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
pHelper
->
tableInfo
.
tid
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
)
;
ASSERT
(
blkIdx
>=
0
&&
blkIdx
<
pIdx
->
numOfBlocks
);
...
...
@@ -971,24 +1062,21 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
}
static
void
tsdbResetHelperFileImpl
(
SRWHelper
*
pHelper
)
{
pHelper
->
idxH
.
numOfIdx
=
0
;
pHelper
->
idxH
.
curIdx
=
0
;
memset
((
void
*
)
&
pHelper
->
files
,
0
,
sizeof
(
pHelper
->
files
));
pHelper
->
files
.
fid
=
-
1
;
pHelper
->
files
.
headF
.
fd
=
-
1
;
pHelper
->
files
.
dataF
.
fd
=
-
1
;
pHelper
->
files
.
lastF
.
fd
=
-
1
;
pHelper
->
files
.
nHeadF
.
fd
=
-
1
;
pHelper
->
files
.
nLastF
.
fd
=
-
1
;
helperHeadF
(
pHelper
)
->
fd
=
-
1
;
helperDataF
(
pHelper
)
->
fd
=
-
1
;
helperLastF
(
pHelper
)
->
fd
=
-
1
;
helperNewHeadF
(
pHelper
)
->
fd
=
-
1
;
helperNewLastF
(
pHelper
)
->
fd
=
-
1
;
#ifdef TSDB_IDX
helperIdxF
(
pHelper
)
->
fd
=
-
1
;
helperNewIdxF
(
pHelper
)
->
fd
=
-
1
;
#endif
}
static
int
tsdbInitHelperFile
(
SRWHelper
*
pHelper
)
{
STsdbCfg
*
pCfg
=
&
pHelper
->
pRepo
->
config
;
size_t
tsize
=
sizeof
(
SCompIdx
)
*
pCfg
->
maxTables
+
sizeof
(
TSCKSUM
);
pHelper
->
pCompIdx
=
(
SCompIdx
*
)
tmalloc
(
tsize
);
if
(
pHelper
->
pCompIdx
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
tsdbResetHelperFileImpl
(
pHelper
);
return
0
;
}
...
...
@@ -996,7 +1084,8 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) {
static
void
tsdbDestroyHelperFile
(
SRWHelper
*
pHelper
)
{
tsdbCloseHelperFile
(
pHelper
,
false
);
tsdbResetHelperFileImpl
(
pHelper
);
tzfree
(
pHelper
->
pCompIdx
);
tzfree
(
pHelper
->
idxH
.
pIdxArray
);
tzfree
(
pHelper
->
pWIdx
);
}
// ---------- Operations on Helper Table part
...
...
@@ -1154,7 +1243,7 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
ASSERT
(
pCompBlock
->
numOfSubBlocks
<=
1
);
ASSERT
(
colIds
[
0
]
==
0
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
&
(
pHelper
->
files
.
lastF
)
:
&
(
pHelper
->
files
.
dataF
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
helperLastF
(
pHelper
)
:
helperDataF
(
pHelper
);
SCompCol
compCol
=
{
0
};
// If only load timestamp column, no need to load SCompData part
...
...
@@ -1215,7 +1304,7 @@ _err:
static
int
tsdbLoadBlockDataImpl
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SDataCols
*
pDataCols
)
{
ASSERT
(
pCompBlock
->
numOfSubBlocks
<=
1
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
&
(
pHelper
->
files
.
lastF
)
:
&
(
pHelper
->
files
.
dataF
);
SFile
*
pFile
=
(
pCompBlock
->
last
)
?
helperLastF
(
pHelper
)
:
helperDataF
(
pHelper
);
pHelper
->
pBuffer
=
trealloc
(
pHelper
->
pBuffer
,
pCompBlock
->
len
);
if
(
pHelper
->
pBuffer
==
NULL
)
{
...
...
@@ -1314,6 +1403,7 @@ _err:
static
int
tsdbEncodeSCompIdx
(
void
**
buf
,
SCompIdx
*
pIdx
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeVariantI32
(
buf
,
pIdx
->
tid
);
tlen
+=
taosEncodeVariantU32
(
buf
,
pIdx
->
len
);
tlen
+=
taosEncodeVariantU32
(
buf
,
pIdx
->
offset
);
tlen
+=
taosEncodeFixedU8
(
buf
,
pIdx
->
hasLast
);
...
...
@@ -1329,6 +1419,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
uint32_t
numOfBlocks
=
0
;
uint64_t
value
=
0
;
if
((
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pIdx
->
tid
)))
==
NULL
)
return
NULL
;
if
((
buf
=
taosDecodeVariantU32
(
buf
,
&
(
pIdx
->
len
)))
==
NULL
)
return
NULL
;
if
((
buf
=
taosDecodeVariantU32
(
buf
,
&
(
pIdx
->
offset
)))
==
NULL
)
return
NULL
;
if
((
buf
=
taosDecodeFixedU8
(
buf
,
&
(
hasLast
)))
==
NULL
)
return
NULL
;
...
...
@@ -1346,7 +1437,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
static
int
tsdbProcessAppendCommit
(
SRWHelper
*
pHelper
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pDataCols
,
TSKEY
maxKey
)
{
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
STable
*
pTable
=
pCommitIter
->
pTable
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
TABLE_TID
(
pTable
);
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
int
defaultRowsInBlock
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
SCompBlock
compBlock
=
{
0
};
...
...
@@ -1362,7 +1453,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT
(
rowsRead
>
0
&&
rowsRead
==
pDataCols
->
numOfRows
);
if
(
rowsRead
+
pCompBlock
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
&&
pCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
&&
!
TSDB_NLAST_FILE_OPENED
(
pHelper
))
{
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
lastF
),
pDataCols
,
&
compBlock
,
true
,
false
)
<
0
)
return
-
1
;
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperLastF
(
pHelper
),
pDataCols
,
&
compBlock
,
true
,
false
)
<
0
)
return
-
1
;
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
-
1
,
rowsRead
)
<
0
)
return
-
1
;
}
else
{
if
(
tsdbLoadBlockData
(
pHelper
,
pCompBlock
,
NULL
)
<
0
)
return
-
1
;
...
...
@@ -1393,7 +1484,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int
*
blkIdx
)
{
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
STable
*
pTable
=
pCommitIter
->
pTable
;
SCompIdx
*
pIdx
=
pHelper
->
pCompIdx
+
TABLE_TID
(
pTable
);
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
SCompBlock
compBlock
=
{
0
};
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
int
defaultRowsInBlock
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
...
...
@@ -1427,7 +1518,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int
rowsRead
=
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
rows1
,
pDataCols
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
);
ASSERT
(
rowsRead
==
rows2
&&
rowsRead
==
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
lastF
),
pDataCols
,
&
compBlock
,
true
,
false
)
<
0
)
return
-
1
;
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperLastF
(
pHelper
),
pDataCols
,
&
compBlock
,
true
,
false
)
<
0
)
return
-
1
;
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
tblkIdx
,
rowsRead
)
<
0
)
return
-
1
;
tblkIdx
++
;
}
else
{
...
...
@@ -1466,7 +1557,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if
(
rowsRead
==
0
)
break
;
ASSERT
(
rowsRead
==
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
dataF
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
tblkIdx
++
;
}
...
...
@@ -1493,7 +1584,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int
rowsRead
=
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
keyLimit
,
rows
,
pDataCols
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
);
ASSERT
(
rowsRead
==
rows
&&
rowsRead
==
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
dataF
),
pDataCols
,
&
compBlock
,
false
,
false
)
<
0
)
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
false
)
<
0
)
return
-
1
;
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
tblkIdx
,
rowsRead
)
<
0
)
return
-
1
;
tblkIdx
++
;
...
...
@@ -1506,7 +1597,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
tsdbLoadAndMergeFromCache
(
pDataCols0
,
&
dIter
,
pCommitIter
,
pDataCols
,
keyLimit
,
defaultRowsInBlock
);
if
(
rowsRead
==
0
)
break
;
if
(
tsdbWriteBlockToFile
(
pHelper
,
&
(
pHelper
->
files
.
dataF
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
if
(
round
==
0
)
{
if
(
tsdbUpdateSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
...
...
@@ -1577,10 +1668,10 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols,
ASSERT
(
pDataCols
->
numOfRows
>
0
);
if
(
pDataCols
->
numOfRows
>=
pCfg
->
minRowsPerFileBlock
)
{
pFile
=
&
(
pHelper
->
files
.
dataF
);
pFile
=
helperDataF
(
pHelper
);
}
else
{
isLast
=
true
;
pFile
=
TSDB_NLAST_FILE_OPENED
(
pHelper
)
?
&
(
pHelper
->
files
.
nLastF
)
:
&
(
pHelper
->
files
.
lastF
);
pFile
=
TSDB_NLAST_FILE_OPENED
(
pHelper
)
?
helperNewLastF
(
pHelper
)
:
helperLastF
(
pHelper
);
}
ASSERT
(
pFile
->
fd
>
0
);
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
0e7cfbb0
...
...
@@ -132,6 +132,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
static
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
static
int
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
STimeWindow
*
win
,
STsdbQueryHandle
*
pQueryHandle
);
static
int
tsdbCheckInfoCompar
(
const
void
*
key1
,
const
void
*
key2
);
static
void
tsdbInitDataBlockLoadInfo
(
SDataBlockLoadInfo
*
pBlockLoadInfo
)
{
pBlockLoadInfo
->
slot
=
-
1
;
...
...
@@ -237,6 +238,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
}
}
taosArraySort
(
pQueryHandle
->
pTableCheckInfo
,
tsdbCheckInfoCompar
);
pQueryHandle
->
defaultLoadColumn
=
getDefaultLoadColumns
(
pQueryHandle
,
true
);
tsdbDebug
(
"%p total numOfTable:%zu in query, %p"
,
pQueryHandle
,
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
),
pQueryHandle
->
qinfo
);
...
...
@@ -554,7 +556,9 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
i
);
pCheckInfo
->
numOfBlocks
=
0
;
SCompIdx
*
compIndex
=
&
pQueryHandle
->
rhelper
.
pCompIdx
[
pCheckInfo
->
tableId
.
tid
];
tsdbSetHelperTable
(
&
pQueryHandle
->
rhelper
,
pCheckInfo
->
pTableObj
,
pQueryHandle
->
pTsdb
);
SCompIdx
*
compIndex
=
&
pQueryHandle
->
rhelper
.
curCompIdx
;
// no data block in this file, try next file
if
(
compIndex
->
len
==
0
||
compIndex
->
numOfBlocks
==
0
||
compIndex
->
uid
!=
pCheckInfo
->
tableId
.
uid
)
{
...
...
@@ -571,8 +575,6 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
pCheckInfo
->
compSize
=
compIndex
->
len
;
}
tsdbSetHelperTable
(
&
pQueryHandle
->
rhelper
,
pCheckInfo
->
pTableObj
,
pQueryHandle
->
pTsdb
);
tsdbLoadCompInfo
(
&
(
pQueryHandle
->
rhelper
),
(
void
*
)(
pCheckInfo
->
pCompInfo
));
SCompInfo
*
pCompInfo
=
pCheckInfo
->
pCompInfo
;
...
...
@@ -2431,3 +2433,13 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
taosArrayDestroy
(
pGroupList
->
pGroupList
);
}
static
int
tsdbCheckInfoCompar
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(((
STableCheckInfo
*
)
key1
)
->
tableId
.
tid
<
((
STableCheckInfo
*
)
key2
)
->
tableId
.
tid
)
{
return
-
1
;
}
else
if
(((
STableCheckInfo
*
)
key1
)
->
tableId
.
tid
>
((
STableCheckInfo
*
)
key2
)
->
tableId
.
tid
)
{
return
1
;
}
else
{
ASSERT
(
false
);
return
0
;
}
}
\ No newline at end of file
src/util/inc/tkvstore.h
浏览文件 @
0e7cfbb0
...
...
@@ -29,6 +29,7 @@ typedef struct {
int64_t
tombSize
;
int64_t
nRecords
;
int64_t
nDels
;
uint32_t
magic
;
}
SStoreInfo
;
typedef
struct
{
...
...
@@ -45,6 +46,8 @@ typedef struct {
SStoreInfo
info
;
}
SKVStore
;
#define KVSTORE_MAGIC(s) (s)->info.magic
int
tdCreateKVStore
(
char
*
fname
);
int
tdDestroyKVStore
(
char
*
fname
);
SKVStore
*
tdOpenKVStore
(
char
*
fname
,
iterFunc
iFunc
,
afterFunc
aFunc
,
void
*
appH
);
...
...
src/util/src/tkvstore.c
浏览文件 @
0e7cfbb0
...
...
@@ -34,6 +34,7 @@
#define TD_KVSTORE_MAINOR_VERSION 0
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
#define TD_KVSTORE_NEW_SUFFIX ".new"
#define TD_KVSTORE_INIT_MAGIC 0xFFFFFFFF
typedef
struct
{
uint64_t
uid
;
...
...
@@ -140,6 +141,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
if
(
tdLoadKVStoreHeader
(
pStore
->
fd
,
pStore
->
fname
,
&
info
)
<
0
)
goto
_err
;
pStore
->
info
.
size
=
TD_KVSTORE_HEADER_SIZE
;
pStore
->
info
.
magic
=
info
.
magic
;
if
(
tdRestoreKVStore
(
pStore
)
<
0
)
goto
_err
;
...
...
@@ -251,6 +253,8 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
return
-
1
;
}
pStore
->
info
.
magic
=
taosCalcChecksum
(
pStore
->
info
.
magic
,
(
uint8_t
*
)
POINTER_SHIFT
(
cont
,
contLen
-
sizeof
(
TSCKSUM
)),
sizeof
(
TSCKSUM
));
pStore
->
info
.
size
+=
(
sizeof
(
SKVRecord
)
+
contLen
);
SKVRecord
*
pRecord
=
taosHashGet
(
pStore
->
map
,
(
void
*
)
&
uid
,
sizeof
(
uid
));
if
(
pRecord
!=
NULL
)
{
// just to insert
...
...
@@ -288,6 +292,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
return
-
1
;
}
pStore
->
info
.
magic
=
taosCalcChecksum
(
pStore
->
info
.
magic
,
(
uint8_t
*
)
buf
,
POINTER_DISTANCE
(
pBuf
,
buf
));
pStore
->
info
.
size
+=
POINTER_DISTANCE
(
pBuf
,
buf
);
pStore
->
info
.
nDels
++
;
pStore
->
info
.
nRecords
--
;
...
...
@@ -371,7 +376,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
}
static
int
tdInitKVStoreHeader
(
int
fd
,
char
*
fname
)
{
SStoreInfo
info
=
{
TD_KVSTORE_HEADER_SIZE
,
0
,
0
,
0
};
SStoreInfo
info
=
{
TD_KVSTORE_HEADER_SIZE
,
0
,
0
,
0
,
TD_KVSTORE_INIT_MAGIC
};
return
tdUpdateKVStoreHeader
(
fd
,
fname
,
&
info
);
}
...
...
@@ -382,6 +387,7 @@ static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) {
tlen
+=
taosEncodeVariantI64
(
buf
,
pInfo
->
tombSize
);
tlen
+=
taosEncodeVariantI64
(
buf
,
pInfo
->
nRecords
);
tlen
+=
taosEncodeVariantI64
(
buf
,
pInfo
->
nDels
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
return
tlen
;
}
...
...
@@ -391,6 +397,7 @@ static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
buf
=
taosDecodeVariantI64
(
buf
,
&
(
pInfo
->
tombSize
));
buf
=
taosDecodeVariantI64
(
buf
,
&
(
pInfo
->
nRecords
));
buf
=
taosDecodeVariantI64
(
buf
,
&
(
pInfo
->
nDels
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
return
buf
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录