Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
683c4d58
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
683c4d58
编写于
5月 27, 2021
作者:
K
Kaili Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-4324]<fix> skip expired files during init and sync
上级
93e481f7
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
115 addition
and
16 deletion
+115
-16
src/tsdb/inc/tsdbCommit.h
src/tsdb/inc/tsdbCommit.h
+1
-0
src/tsdb/inc/tsdbint.h
src/tsdb/inc/tsdbint.h
+1
-0
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+1
-2
src/tsdb/src/tsdbFS.c
src/tsdb/src/tsdbFS.c
+78
-5
src/tsdb/src/tsdbSync.c
src/tsdb/src/tsdbSync.c
+34
-9
未找到文件。
src/tsdb/inc/tsdbCommit.h
浏览文件 @
683c4d58
...
...
@@ -33,6 +33,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int
tsdbEncodeKVRecord
(
void
**
buf
,
SKVRecord
*
pRecord
);
void
*
tsdbDecodeKVRecord
(
void
*
buf
,
SKVRecord
*
pRecord
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
);
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
int
tsdbGetFidLevel
(
int
fid
,
SRtn
*
pRtn
)
{
if
(
fid
>=
pRtn
->
maxFid
)
{
...
...
src/tsdb/inc/tsdbint.h
浏览文件 @
683c4d58
...
...
@@ -83,6 +83,7 @@ struct STsdbRepo {
SMemTable
*
mem
;
SMemTable
*
imem
;
STsdbFS
*
fs
;
SRtn
rtn
;
tsem_t
readyToCommit
;
pthread_mutex_t
mutex
;
bool
repoLocked
;
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
683c4d58
...
...
@@ -86,7 +86,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static
bool
tsdbCanAddSubBlock
(
SCommitH
*
pCommith
,
SBlock
*
pBlock
,
SMergeInfo
*
pInfo
);
static
void
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
);
static
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
);
static
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
)
{
...
...
@@ -1428,7 +1427,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
return
false
;
}
static
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
)
{
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
)
{
SRtn
rtn
;
SFSIter
fsiter
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
...
...
src/tsdb/src/tsdbFS.c
浏览文件 @
683c4d58
...
...
@@ -33,7 +33,9 @@ static int tsdbScanDataDir(STsdbRepo *pRepo);
static
bool
tsdbIsTFileInFS
(
STsdbFS
*
pfs
,
const
TFILE
*
pf
);
static
int
tsdbRestoreCurrent
(
STsdbRepo
*
pRepo
);
static
int
tsdbComparTFILE
(
const
void
*
arg1
,
const
void
*
arg2
);
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
);
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
,
int32_t
*
nExpired
);
static
int
tsdbProcessExpiredFS
(
STsdbRepo
*
pRepo
);
static
int
tsdbCreateMeta
(
STsdbRepo
*
pRepo
);
// ================== CURRENT file header info
static
int
tsdbEncodeFSHeader
(
void
**
buf
,
SFSHeader
*
pHeader
)
{
...
...
@@ -212,6 +214,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg) {
return
NULL
;
}
pfs
->
intxn
=
false
;
pfs
->
nstatus
=
tsdbNewFSStatus
(
maxFSet
);
if
(
pfs
->
nstatus
==
NULL
)
{
tsdbFreeFS
(
pfs
);
...
...
@@ -234,22 +238,84 @@ void *tsdbFreeFS(STsdbFS *pfs) {
return
NULL
;
}
static
int
tsdbProcessExpiredFS
(
STsdbRepo
*
pRepo
)
{
tsdbStartFSTxn
(
pRepo
,
0
,
0
);
if
(
tsdbCreateMeta
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to apply rtn since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbApplyRtn
(
pRepo
)
<
0
)
{
tsdbEndFSTxnWithError
(
REPO_FS
(
pRepo
));
tsdbError
(
"vgId:%d failed to apply rtn since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbEndFSTxn
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to end fs txn since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
return
0
;
}
static
int
tsdbCreateMeta
(
STsdbRepo
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SMFile
*
pOMFile
=
pfs
->
cstatus
->
pmf
;
SMFile
mf
;
SDiskID
did
;
if
(
pOMFile
!=
NULL
)
{
// keep the old meta file
tsdbUpdateMFile
(
pfs
,
pOMFile
);
return
0
;
}
// Create a new meta file
did
.
level
=
TFS_PRIMARY_LEVEL
;
did
.
id
=
TFS_PRIMARY_ID
;
tsdbInitMFile
(
&
mf
,
did
,
REPO_ID
(
pRepo
),
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)));
if
(
tsdbCreateMFile
(
&
mf
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d failed to create META file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
tsdbInfo
(
"vgId:%d meta file %s is created"
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
&
mf
));
if
(
tsdbUpdateMFileHeader
(
&
mf
)
<
0
)
{
tsdbError
(
"vgId:%d failed to update META file header since %s, revert it"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbApplyMFileChange
(
&
mf
,
pOMFile
);
return
-
1
;
}
TSDB_FILE_FSYNC
(
&
mf
);
tsdbCloseMFile
(
&
mf
);
tsdbUpdateMFile
(
pfs
,
&
mf
);
return
0
;
}
int
tsdbOpenFS
(
STsdbRepo
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
char
current
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
int
nExpired
=
0
;
ASSERT
(
pfs
!=
NULL
);
tsdbGetTxnFname
(
REPO_ID
(
pRepo
),
TSDB_TXN_CURR_FILE
,
current
);
tsdbGetRtnSnap
(
pRepo
,
&
pRepo
->
rtn
);
if
(
access
(
current
,
F_OK
)
==
0
)
{
if
(
tsdbOpenFSFromCurrent
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to open FS since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
tsdbScanAndTryFixDFilesHeader
(
pRepo
);
tsdbScanAndTryFixDFilesHeader
(
pRepo
,
&
nExpired
);
if
(
nExpired
>
0
)
{
tsdbProcessExpiredFS
(
pRepo
);
}
}
else
{
// should skip expired fileset inside of the function
if
(
tsdbRestoreCurrent
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to restore current file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
...
...
@@ -1110,6 +1176,11 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
ASSERT
(
tvid
==
REPO_ID
(
pRepo
));
if
(
tfid
<
pRepo
->
rtn
.
minFid
)
{
// skip file expired
++
index
;
continue
;
}
if
(
ftype
==
0
)
{
fset
.
fid
=
tfid
;
}
else
{
...
...
@@ -1206,7 +1277,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
}
}
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
)
{
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdbRepo
*
pRepo
,
int32_t
*
nExpired
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SFSStatus
*
pStatus
=
pfs
->
cstatus
;
SDFInfo
info
;
...
...
@@ -1214,7 +1285,9 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) {
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pStatus
->
df
);
i
++
)
{
SDFileSet
fset
;
tsdbInitDFileSetEx
(
&
fset
,
(
SDFileSet
*
)
taosArrayGet
(
pStatus
->
df
,
i
));
if
(
fset
.
fid
<
pRepo
->
rtn
.
minFid
)
{
++*
nExpired
;
}
tsdbDebug
(
"vgId:%d scan DFileSet %d header"
,
REPO_ID
(
pRepo
),
fset
.
fid
);
if
(
tsdbOpenDFileSet
(
&
fset
,
O_RDWR
)
<
0
)
{
...
...
src/tsdb/src/tsdbSync.c
浏览文件 @
683c4d58
...
...
@@ -424,24 +424,42 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
}
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, filed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d, f
a
iled to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
}
else
{
// Need to copy from remote
tsdbInfo
(
"vgId:%d, fileset:%d will be received"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
// Notify remote to send there file here
if
(
tsdbSendDecision
(
pSynch
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
int
fidLevel
=
tsdbGetFidLevel
(
pSynch
->
pdf
->
fid
,
&
(
pSynch
->
rtn
));
if
(
fidLevel
<
0
)
{
// expired fileset
tsdbInfo
(
"vgId:%d, fileset:%d will be skipped as expired"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
// Move forward
if
(
tsdbRecvDFileSetInfo
(
pSynch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv fileset since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
pLSet
)
{
pLSet
=
tsdbFSIterNext
(
&
fsiter
);
}
// Next loop
continue
;
}
else
{
tsdbInfo
(
"vgId:%d, fileset:%d will be received"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
// Notify remote to send there file here
if
(
tsdbSendDecision
(
pSynch
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
}
// Create local files and copy from remote
SDiskID
did
;
SDFileSet
fset
;
tfsAllocDisk
(
tsdbGetFidLevel
(
pSynch
->
pdf
->
fid
,
&
(
pSynch
->
rtn
))
,
&
(
did
.
level
),
&
(
did
.
id
));
tfsAllocDisk
(
fidLevel
,
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
tsdbError
(
"vgId:%d, failed allc disk since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
...
...
@@ -548,12 +566,19 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
STsdbRepo
*
pRepo
=
pSynch
->
pRepo
;
bool
toSend
=
false
;
// skip expired fileset
if
(
pSet
&&
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
pSynch
->
rtn
))
<
0
)
{
tsdbInfo
(
"vgId:%d, don't sync send since fileset:%d smaller than minFid:%d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
pSynch
->
rtn
.
minFid
);
return
0
;
}
if
(
tsdbSendDFileSetInfo
(
pSynch
,
pSet
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send fileset:%d info since %s"
,
REPO_ID
(
pRepo
),
pSet
?
pSet
->
fid
:
-
1
,
tstrerror
(
terrno
));
return
-
1
;
}
// No file any more, no need to send file, just return
// No file any more, no need to send file, just return
if
(
pSet
==
NULL
)
{
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录