Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
dd9f62cf
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dd9f62cf
编写于
7月 20, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
remove idx file back
上级
9bb9d7a2
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
78 addition
and
32 deletion
+78
-32
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+10
-1
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+16
-10
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+2
-0
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+50
-21
未找到文件。
src/tsdb/inc/tsdbMain.h
浏览文件 @
dd9f62cf
...
@@ -132,12 +132,18 @@ typedef struct {
...
@@ -132,12 +132,18 @@ typedef struct {
// ------------------ tsdbFile.c
// ------------------ tsdbFile.c
extern
const
char
*
tsdbFileSuffix
[];
extern
const
char
*
tsdbFileSuffix
[];
typedef
enum
{
typedef
enum
{
#ifdef TSDB_IDX
TSDB_FILE_TYPE_IDX
=
0
,
TSDB_FILE_TYPE_IDX
=
0
,
TSDB_FILE_TYPE_HEAD
,
TSDB_FILE_TYPE_HEAD
,
#else
TSDB_FILE_TYPE_HEAD
=
0
,
#endif
TSDB_FILE_TYPE_DATA
,
TSDB_FILE_TYPE_DATA
,
TSDB_FILE_TYPE_LAST
,
TSDB_FILE_TYPE_LAST
,
TSDB_FILE_TYPE_MAX
,
TSDB_FILE_TYPE_MAX
,
#ifdef TSDB_IDX
TSDB_FILE_TYPE_NIDX
,
TSDB_FILE_TYPE_NIDX
,
#endif
TSDB_FILE_TYPE_NHEAD
,
TSDB_FILE_TYPE_NHEAD
,
TSDB_FILE_TYPE_NLAST
TSDB_FILE_TYPE_NLAST
}
TSDB_FILE_TYPE
;
}
TSDB_FILE_TYPE
;
...
@@ -147,6 +153,7 @@ typedef struct {
...
@@ -147,6 +153,7 @@ typedef struct {
uint32_t
len
;
uint32_t
len
;
uint32_t
totalBlocks
;
uint32_t
totalBlocks
;
uint32_t
totalSubBlocks
;
uint32_t
totalSubBlocks
;
uint32_t
offset
;
uint64_t
size
;
// total size of the file
uint64_t
size
;
// total size of the file
uint64_t
tombSize
;
// unused file size
uint64_t
tombSize
;
// unused file size
}
STsdbFileInfo
;
}
STsdbFileInfo
;
...
@@ -450,11 +457,13 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
...
@@ -450,11 +457,13 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
#define helperState(h) (h)->state
#define helperState(h) (h)->state
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
#define helperFileId(h) ((h)->files.fGroup.fileId)
#define helperFileId(h) ((h)->files.fGroup.fileId)
#ifdef TSDB_IDX
#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX]))
#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX]))
#define helperNewIdxF(h) (&((h)->files.nIdxF))
#endif
#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
#define helperNewIdxF(h) (&((h)->files.nIdxF))
#define helperNewHeadF(h) (&((h)->files.nHeadF))
#define helperNewHeadF(h) (&((h)->files.nHeadF))
#define helperNewLastF(h) (&((h)->files.nLastF))
#define helperNewLastF(h) (&((h)->files.nLastF))
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
dd9f62cf
...
@@ -30,7 +30,11 @@
...
@@ -30,7 +30,11 @@
#include "ttime.h"
#include "ttime.h"
#include "tfile.h"
#include "tfile.h"
#ifdef TSDB_IDX
const
char
*
tsdbFileSuffix
[]
=
{
".idx"
,
".head"
,
".data"
,
".last"
,
""
,
".i"
,
".h"
,
".l"
};
const
char
*
tsdbFileSuffix
[]
=
{
".idx"
,
".head"
,
".data"
,
".last"
,
""
,
".i"
,
".h"
,
".l"
};
#else
const
char
*
tsdbFileSuffix
[]
=
{
".head"
,
".data"
,
".last"
,
""
,
".h"
,
".l"
};
#endif
static
int
tsdbInitFile
(
SFile
*
pFile
,
STsdbRepo
*
pRepo
,
int
fid
,
int
type
);
static
int
tsdbInitFile
(
SFile
*
pFile
,
STsdbRepo
*
pRepo
,
int
fid
,
int
type
);
static
void
tsdbDestroyFile
(
SFile
*
pFile
);
static
void
tsdbDestroyFile
(
SFile
*
pFile
);
...
@@ -108,7 +112,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
...
@@ -108,7 +112,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
memset
((
void
*
)(
&
fileGroup
),
0
,
sizeof
(
SFileGroup
));
memset
((
void
*
)(
&
fileGroup
),
0
,
sizeof
(
SFileGroup
));
fileGroup
.
fileId
=
fid
;
fileGroup
.
fileId
=
fid
;
for
(
int
type
=
TSDB_FILE_TYPE_IDX
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbInitFile
(
&
fileGroup
.
files
[
type
],
pRepo
,
fid
,
type
)
<
0
)
{
if
(
tsdbInitFile
(
&
fileGroup
.
files
[
type
],
pRepo
,
fid
,
type
)
<
0
)
{
tsdbError
(
"vgId:%d failed to init file fid %d type %d"
,
REPO_ID
(
pRepo
),
fid
,
type
);
tsdbError
(
"vgId:%d failed to init file fid %d type %d"
,
REPO_ID
(
pRepo
),
fid
,
type
);
goto
_err
;
goto
_err
;
...
@@ -126,7 +130,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
...
@@ -126,7 +130,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
return
0
;
return
0
;
_err:
_err:
for
(
int
type
=
TSDB_FILE_TYPE_IDX
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
fileGroup
.
files
[
type
]);
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
fileGroup
.
files
[
type
]);
tfree
(
tDataDir
);
tfree
(
tDataDir
);
if
(
dir
!=
NULL
)
closedir
(
dir
);
if
(
dir
!=
NULL
)
closedir
(
dir
);
...
@@ -139,7 +143,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
...
@@ -139,7 +143,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
for
(
int
i
=
0
;
i
<
pFileH
->
nFGroups
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pFileH
->
nFGroups
;
i
++
)
{
SFileGroup
*
pFGroup
=
pFileH
->
pFGroup
+
i
;
SFileGroup
*
pFGroup
=
pFileH
->
pFGroup
+
i
;
for
(
int
type
=
TSDB_FILE_TYPE_IDX
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
tsdbDestroyFile
(
&
pFGroup
->
files
[
type
]);
tsdbDestroyFile
(
&
pFGroup
->
files
[
type
]);
}
}
}
}
...
@@ -156,7 +160,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
...
@@ -156,7 +160,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
SFileGroup
*
pGroup
=
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
);
SFileGroup
*
pGroup
=
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
);
if
(
pGroup
==
NULL
)
{
// if not exists, create one
if
(
pGroup
==
NULL
)
{
// if not exists, create one
pFGroup
->
fileId
=
fid
;
pFGroup
->
fileId
=
fid
;
for
(
int
type
=
TSDB_FILE_TYPE_IDX
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
&
pFGroup
->
files
[
type
],
pRepo
,
fid
,
type
)
<
0
)
if
(
tsdbCreateFile
(
&
pFGroup
->
files
[
type
],
pRepo
,
fid
,
type
)
<
0
)
goto
_err
;
goto
_err
;
}
}
...
@@ -169,7 +173,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
...
@@ -169,7 +173,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
return
pGroup
;
return
pGroup
;
_err:
_err:
for
(
int
type
=
TSDB_FILE_TYPE_IDX
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
pGroup
->
files
[
type
]);
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
tsdbDestroyFile
(
&
pGroup
->
files
[
type
]);
return
NULL
;
return
NULL
;
}
}
...
@@ -325,10 +329,11 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
...
@@ -325,10 +329,11 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
int
tlen
=
0
;
int
tlen
=
0
;
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
len
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
len
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
size
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
tombSize
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalSubBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
totalSubBlocks
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
offset
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
size
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
tombSize
);
return
tlen
;
return
tlen
;
}
}
...
@@ -336,10 +341,11 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
...
@@ -336,10 +341,11 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
)
{
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
)
{
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
len
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
len
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
size
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
tombSize
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalSubBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
totalSubBlocks
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
offset
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
size
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
tombSize
));
return
buf
;
return
buf
;
}
}
...
@@ -358,7 +364,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
...
@@ -358,7 +364,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
pFileH
->
nFGroups
--
;
pFileH
->
nFGroups
--
;
ASSERT
(
pFileH
->
nFGroups
>=
0
);
ASSERT
(
pFileH
->
nFGroups
>=
0
);
for
(
int
type
=
TSDB_FILE_TYPE_IDX
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
for
(
int
type
=
0
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
remove
(
fileGroup
.
files
[
type
].
fname
)
<
0
)
{
if
(
remove
(
fileGroup
.
files
[
type
].
fname
)
<
0
)
{
tsdbError
(
"vgId:%d failed to remove file %s"
,
REPO_ID
(
pRepo
),
fileGroup
.
files
[
type
].
fname
);
tsdbError
(
"vgId:%d failed to remove file %s"
,
REPO_ID
(
pRepo
),
fileGroup
.
files
[
type
].
fname
);
}
}
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
dd9f62cf
...
@@ -628,7 +628,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
...
@@ -628,7 +628,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
tsdbCloseHelperFile
(
pHelper
,
0
);
tsdbCloseHelperFile
(
pHelper
,
0
);
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
#ifdef TSDB_IDX
pGroup
->
files
[
TSDB_FILE_TYPE_IDX
]
=
*
(
helperIdxF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_IDX
]
=
*
(
helperIdxF
(
pHelper
));
#endif
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
]
=
*
(
helperHeadF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
]
=
*
(
helperHeadF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
]
=
*
(
helperDataF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
]
=
*
(
helperDataF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
]
=
*
(
helperLastF
(
pHelper
));
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
]
=
*
(
helperLastF
(
pHelper
));
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
dd9f62cf
...
@@ -109,21 +109,27 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
...
@@ -109,21 +109,27 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Set the files
// Set the files
pHelper
->
files
.
fGroup
=
*
pGroup
;
pHelper
->
files
.
fGroup
=
*
pGroup
;
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
#ifdef TSDB_IDX
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NIDX
,
helperNewIdxF
(
pHelper
)
->
fname
);
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NIDX
,
helperNewIdxF
(
pHelper
)
->
fname
);
#endif
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NHEAD
,
helperNewHeadF
(
pHelper
)
->
fname
);
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NHEAD
,
helperNewHeadF
(
pHelper
)
->
fname
);
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NLAST
,
helperNewLastF
(
pHelper
)
->
fname
);
tsdbGetDataFileName
(
pHelper
->
pRepo
,
pGroup
->
fileId
,
TSDB_FILE_TYPE_NLAST
,
helperNewLastF
(
pHelper
)
->
fname
);
}
}
// Open the files
// Open the files
#ifdef TSDB_IDX
if
(
tsdbOpenFile
(
helperIdxF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperIdxF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
#endif
if
(
tsdbOpenFile
(
helperHeadF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperHeadF
(
pHelper
),
O_RDONLY
)
<
0
)
goto
_err
;
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
if
(
tsdbOpenFile
(
helperDataF
(
pHelper
),
O_RDWR
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperDataF
(
pHelper
),
O_RDWR
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperLastF
(
pHelper
),
O_RDWR
)
<
0
)
goto
_err
;
if
(
tsdbOpenFile
(
helperLastF
(
pHelper
),
O_RDWR
)
<
0
)
goto
_err
;
#ifdef TSDB_IDX
// Create and open .i file
// Create and open .i file
if
(
tsdbOpenFile
(
helperNewIdxF
(
pHelper
),
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
if
(
tsdbOpenFile
(
helperNewIdxF
(
pHelper
),
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
if
(
tsdbUpdateFileHeader
(
helperNewIdxF
(
pHelper
),
0
)
<
0
)
return
-
1
;
if
(
tsdbUpdateFileHeader
(
helperNewIdxF
(
pHelper
),
0
)
<
0
)
return
-
1
;
#endif
// Create and open .h
// Create and open .h
if
(
tsdbOpenFile
(
helperNewHeadF
(
pHelper
),
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
if
(
tsdbOpenFile
(
helperNewHeadF
(
pHelper
),
O_WRONLY
|
O_CREAT
)
<
0
)
return
-
1
;
...
@@ -150,11 +156,13 @@ _err:
...
@@ -150,11 +156,13 @@ _err:
int
tsdbCloseHelperFile
(
SRWHelper
*
pHelper
,
bool
hasError
)
{
int
tsdbCloseHelperFile
(
SRWHelper
*
pHelper
,
bool
hasError
)
{
SFile
*
pFile
=
NULL
;
SFile
*
pFile
=
NULL
;
#ifdef TSDB_IDX
pFile
=
helperIdxF
(
pHelper
);
pFile
=
helperIdxF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
pFile
->
fd
>
0
)
{
close
(
pFile
->
fd
);
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
pFile
->
fd
=
-
1
;
}
}
#endif
pFile
=
helperHeadF
(
pHelper
);
pFile
=
helperHeadF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
pFile
->
fd
>
0
)
{
...
@@ -182,6 +190,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
...
@@ -182,6 +190,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
}
}
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
if
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
)
{
#ifdef TSDB_IDX
pFile
=
helperNewIdxF
(
pHelper
);
pFile
=
helperNewIdxF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
pFile
->
fd
>
0
)
{
if
(
!
hasError
)
tsdbUpdateFileHeader
(
pFile
,
0
);
if
(
!
hasError
)
tsdbUpdateFileHeader
(
pFile
,
0
);
...
@@ -200,6 +209,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
...
@@ -200,6 +209,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
helperIdxF
(
pHelper
)
->
info
=
pFile
->
info
;
helperIdxF
(
pHelper
)
->
info
=
pFile
->
info
;
}
}
}
}
#endif
pFile
=
helperNewHeadF
(
pHelper
);
pFile
=
helperNewHeadF
(
pHelper
);
if
(
pFile
->
fd
>
0
)
{
if
(
pFile
->
fd
>
0
)
{
...
@@ -365,12 +375,13 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
...
@@ -365,12 +375,13 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
)
{
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
)
{
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
off_t
offset
=
0
;
off_t
offset
=
0
;
SFile
*
pFile
=
helperNewHeadF
(
pHelper
);
if
(
pIdx
->
len
>
0
)
{
if
(
pIdx
->
len
>
0
)
{
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_INFO_LOAD
))
{
offset
=
lseek
(
helperNewHeadF
(
pHelper
)
->
fd
,
0
,
SEEK_END
);
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseed file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
helperNewHeadF
(
pHelper
)
->
fname
,
tsdbError
(
"vgId:%d failed to lseed file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
...
@@ -381,9 +392,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -381,9 +392,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx
->
tid
=
pHelper
->
tableInfo
.
tid
;
pIdx
->
tid
=
pHelper
->
tableInfo
.
tid
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
tsendfile
(
helperNewHeadF
(
pHelper
)
->
fd
,
helperHeadF
(
pHelper
)
->
fd
,
NULL
,
pIdx
->
len
)
<
pIdx
->
len
)
{
if
(
tsendfile
(
pFile
->
fd
,
helperHeadF
(
pHelper
)
->
fd
,
NULL
,
pIdx
->
len
)
<
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to send %d bytes from file %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
tsdbError
(
"vgId:%d failed to send %d bytes from file %s to %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
helperHeadF
(
pHelper
)
->
fname
,
helperNewHeadF
(
pHelper
)
->
fname
,
strerror
(
errno
));
helperHeadF
(
pHelper
)
->
fname
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
}
}
...
@@ -394,9 +405,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -394,9 +405,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
ASSERT
(
pIdx
->
len
>
sizeof
(
SCompInfo
)
+
sizeof
(
TSCKSUM
)
&&
ASSERT
(
pIdx
->
len
>
sizeof
(
SCompInfo
)
+
sizeof
(
TSCKSUM
)
&&
(
pIdx
->
len
-
sizeof
(
SCompInfo
)
-
sizeof
(
TSCKSUM
))
%
sizeof
(
SCompBlock
)
==
0
);
(
pIdx
->
len
-
sizeof
(
SCompInfo
)
-
sizeof
(
TSCKSUM
))
%
sizeof
(
SCompBlock
)
==
0
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pCompInfo
,
pIdx
->
len
);
offset
=
lseek
(
helperNewHeadF
(
pHelper
)
->
fd
,
0
,
SEEK_END
);
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
helperNewHeadF
(
pHelper
)
->
fname
,
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
...
@@ -406,15 +417,19 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -406,15 +417,19 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx
->
tid
=
pHelper
->
tableInfo
.
tid
;
pIdx
->
tid
=
pHelper
->
tableInfo
.
tid
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
twrite
(
helperNewHeadF
(
pHelper
)
->
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
if
(
twrite
(
pFile
->
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
helperNewHeadF
(
pHelper
)
->
fname
,
strerror
(
errno
));
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
}
}
}
}
if
(
tsizeof
(
pHelper
->
pWIdx
)
<
helperNewIdxF
(
pHelper
)
->
info
.
len
+
sizeof
(
SCompIdx
)
+
12
)
{
#ifdef TSDB_IDX
pFile
=
helperNewIdxF
(
pHelper
);
#endif
if
(
tsizeof
(
pHelper
->
pWIdx
)
<
pFile
->
info
.
len
+
sizeof
(
SCompIdx
)
+
12
)
{
pHelper
->
pWIdx
=
trealloc
(
pHelper
->
pWIdx
,
tsizeof
(
pHelper
->
pWIdx
)
==
0
?
1024
:
tsizeof
(
pHelper
->
pWIdx
)
*
2
);
pHelper
->
pWIdx
=
trealloc
(
pHelper
->
pWIdx
,
tsizeof
(
pHelper
->
pWIdx
)
==
0
?
1024
:
tsizeof
(
pHelper
->
pWIdx
)
*
2
);
if
(
pHelper
->
pWIdx
==
NULL
)
{
if
(
pHelper
->
pWIdx
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
@@ -422,8 +437,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -422,8 +437,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
}
}
}
}
void
*
pBuf
=
POINTER_SHIFT
(
pHelper
->
pWIdx
,
helperNewIdxF
(
pHelper
)
->
info
.
len
);
void
*
pBuf
=
POINTER_SHIFT
(
pHelper
->
pWIdx
,
pFile
->
info
.
len
);
helperNewIdxF
(
pHelper
)
->
info
.
len
+=
tsdbEncodeSCompIdx
(
&
pBuf
,
&
(
pHelper
->
curCompIdx
));
pFile
->
info
.
len
+=
tsdbEncodeSCompIdx
(
&
pBuf
,
&
(
pHelper
->
curCompIdx
));
}
}
return
0
;
return
0
;
...
@@ -431,9 +446,13 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
...
@@ -431,9 +446,13 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
)
{
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
)
{
ASSERT
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
);
ASSERT
(
helperType
(
pHelper
)
==
TSDB_WRITE_HELPER
);
// STsdbCfg *pCfg = &pHelper->pRepo->config
;
off_t
offset
=
0
;
#ifdef TSDB_IDX
SFile
*
pFile
=
helperNewIdxF
(
pHelper
);
SFile
*
pFile
=
helperNewIdxF
(
pHelper
);
#else
SFile
*
pFile
=
helperNewHeadF
(
pHelper
);
#endif
pFile
->
info
.
len
+=
sizeof
(
TSCKSUM
);
pFile
->
info
.
len
+=
sizeof
(
TSCKSUM
);
if
(
tsizeof
(
pHelper
->
pWIdx
)
<
pFile
->
info
.
len
)
{
if
(
tsizeof
(
pHelper
->
pWIdx
)
<
pFile
->
info
.
len
)
{
...
@@ -445,6 +464,15 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
...
@@ -445,6 +464,15 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
}
}
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
);
offset
=
lseek
(
pFile
->
fd
,
0
,
SEEK_END
);
if
(
offset
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pFile
->
info
.
offset
=
offset
;
if
(
twrite
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
)
<
pFile
->
info
.
len
)
{
if
(
twrite
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
)
<
pFile
->
info
.
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
info
.
len
,
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
info
.
len
,
pFile
->
fname
,
strerror
(
errno
));
pFile
->
fname
,
strerror
(
errno
));
...
@@ -457,8 +485,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
...
@@ -457,8 +485,12 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
int
tsdbLoadCompIdx
(
SRWHelper
*
pHelper
,
void
*
target
)
{
int
tsdbLoadCompIdx
(
SRWHelper
*
pHelper
,
void
*
target
)
{
ASSERT
(
pHelper
->
state
==
TSDB_HELPER_FILE_SET_AND_OPEN
);
ASSERT
(
pHelper
->
state
==
TSDB_HELPER_FILE_SET_AND_OPEN
);
#ifdef TSDB_IDX
SFile
*
pFile
=
helperIdxF
(
pHelper
);
SFile
*
pFile
=
helperIdxF
(
pHelper
);
int
fd
=
pFile
->
fd
;
#else
SFile
*
pFile
=
helperHeadF
(
pHelper
);
#endif
int
fd
=
pFile
->
fd
;
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
))
{
if
(
!
helperHasState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
))
{
// If not load from file, just load it in object
// If not load from file, just load it in object
...
@@ -468,7 +500,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
...
@@ -468,7 +500,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
return
-
1
;
return
-
1
;
}
}
if
(
lseek
(
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
{
if
(
lseek
(
fd
,
pFile
->
info
.
offset
,
SEEK_SET
)
<
0
)
{
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
tsdbError
(
"vgId:%d failed to lseek file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
...
@@ -516,11 +548,6 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
...
@@ -516,11 +548,6 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
ASSERT
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<=
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
));
ASSERT
(
POINTER_DISTANCE
(
ptr
,
pHelper
->
pBuffer
)
<=
pFile
->
info
.
len
-
sizeof
(
TSCKSUM
));
}
}
// if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// return -1;
// }
}
}
}
}
helperSetState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
);
helperSetState
(
pHelper
,
TSDB_HELPER_IDX_LOAD
);
...
@@ -1031,13 +1058,15 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
...
@@ -1031,13 +1058,15 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
pHelper
->
idxH
.
numOfIdx
=
0
;
pHelper
->
idxH
.
numOfIdx
=
0
;
pHelper
->
idxH
.
curIdx
=
0
;
pHelper
->
idxH
.
curIdx
=
0
;
memset
((
void
*
)
&
pHelper
->
files
,
0
,
sizeof
(
pHelper
->
files
));
memset
((
void
*
)
&
pHelper
->
files
,
0
,
sizeof
(
pHelper
->
files
));
helperIdxF
(
pHelper
)
->
fd
=
-
1
;
helperHeadF
(
pHelper
)
->
fd
=
-
1
;
helperHeadF
(
pHelper
)
->
fd
=
-
1
;
helperDataF
(
pHelper
)
->
fd
=
-
1
;
helperDataF
(
pHelper
)
->
fd
=
-
1
;
helperLastF
(
pHelper
)
->
fd
=
-
1
;
helperLastF
(
pHelper
)
->
fd
=
-
1
;
helperNewIdxF
(
pHelper
)
->
fd
=
-
1
;
helperNewHeadF
(
pHelper
)
->
fd
=
-
1
;
helperNewHeadF
(
pHelper
)
->
fd
=
-
1
;
helperNewLastF
(
pHelper
)
->
fd
=
-
1
;
helperNewLastF
(
pHelper
)
->
fd
=
-
1
;
#ifdef TSDB_IDX
helperIdxF
(
pHelper
)
->
fd
=
-
1
;
helperNewIdxF
(
pHelper
)
->
fd
=
-
1
;
#endif
}
}
static
int
tsdbInitHelperFile
(
SRWHelper
*
pHelper
)
{
static
int
tsdbInitHelperFile
(
SRWHelper
*
pHelper
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录