Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jobily
TDengine
提交
7d30a6e2
T
TDengine
项目概览
jobily
/
TDengine
9 个月 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
7d30a6e2
编写于
6月 30, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
vnode snapshot read
上级
93391f73
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
76 addition
and
14 deletion
+76
-14
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+5
-0
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+1
-1
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+61
-12
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+9
-1
未找到文件。
include/libs/stream/tstream.h
浏览文件 @
7d30a6e2
...
...
@@ -347,6 +347,11 @@ typedef struct SStreamMeta {
void
*
streamBackend
;
int64_t
streamBackendRid
;
int64_t
checkpointTs
;
SArray
*
checkpointSaved
;
SArray
*
checkpointInUse
;
int32_t
checkpointCap
;
SRWLatch
checkpointDirLock
;
}
SStreamMeta
;
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
...
...
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
7d30a6e2
...
...
@@ -46,7 +46,7 @@ typedef struct {
void
*
streamBackendInit
(
const
char
*
path
);
void
streamBackendCleanup
(
void
*
arg
);
int32_t
streamBackendDoCheckpoint
(
int64_t
rid
,
const
char
*
cpP
ath
);
int32_t
streamBackendDoCheckpoint
(
void
*
pMeta
,
const
char
*
p
ath
);
SListNode
*
streamBackendAddCompare
(
void
*
backend
,
void
*
arg
);
void
streamBackendDelCompare
(
void
*
backend
,
void
*
arg
);
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
7d30a6e2
...
...
@@ -195,16 +195,69 @@ void streamBackendCleanup(void* arg) {
qDebug
(
"destroy stream backend backend:%p"
,
pHandle
);
return
;
}
int32_t
streamBackendDoCheckpoint
(
int64_t
backendRid
,
const
char
*
path
)
{
/*
* checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
* checkpointInUse: |--cp2--|--cp4--|, checkpointDir in checkpointInUse do replicate trans, cannot del until
* replication is finished
*/
int32_t
delObsoleteCheckpoint
(
void
*
arg
,
const
char
*
path
)
{
SStreamMeta
*
pMeta
=
arg
;
taosWLockLatch
(
&
pMeta
->
checkpointDirLock
);
int64_t
checkpointId
=
pMeta
->
checkpointTs
;
taosArrayPush
(
pMeta
->
checkpointSaved
,
&
checkpointId
);
SArray
*
checkpointDel
=
taosArrayInit
(
10
,
sizeof
(
int64_t
));
SArray
*
checkpointDup
=
taosArrayInit
(
10
,
sizeof
(
int64_t
));
int64_t
minId
=
0
;
if
(
taosArrayGetSize
(
pMeta
->
checkpointInUse
)
>=
1
)
{
minId
=
*
(
int64_t
*
)
taosArrayGet
(
pMeta
->
checkpointInUse
,
0
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pMeta
->
checkpointSaved
);
i
++
)
{
int64_t
id
=
*
(
int64_t
*
)
taosArrayGet
(
pMeta
->
checkpointSaved
,
i
);
if
(
id
>=
minId
)
{
taosArrayPush
(
checkpointDup
,
&
id
);
}
else
{
taosArrayPush
(
checkpointDel
,
&
id
);
}
}
}
else
{
for
(
int
i
=
taosArrayGetSize
(
pMeta
->
checkpointSaved
);
i
>=
0
;
i
--
)
{
int64_t
id
=
*
(
int64_t
*
)
taosArrayGet
(
pMeta
->
checkpointSaved
,
i
);
if
(
taosArrayGetSize
(
checkpointDup
)
<
pMeta
->
checkpointCap
)
{
taosArrayPush
(
checkpointDup
,
&
id
);
}
else
{
taosArrayPush
(
checkpointDel
,
&
id
);
}
}
}
taosArrayDestroy
(
pMeta
->
checkpointSaved
);
pMeta
->
checkpointSaved
=
checkpointDup
;
taosWUnLockLatch
(
&
pMeta
->
checkpointDirLock
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
checkpointDel
);
i
++
)
{
int64_t
id
=
*
(
int64_t
*
)
taosArrayGet
(
checkpointDel
,
i
);
char
tbuf
[
256
]
=
{
0
};
sprintf
(
tbuf
,
"%s/checkpoint_%"
PRId64
""
,
path
,
id
);
if
(
taosIsDir
(
tbuf
))
{
taosRemoveDir
(
tbuf
);
}
}
return
0
;
}
int32_t
streamBackendDoCheckpoint
(
void
*
arg
,
const
char
*
path
)
{
SStreamMeta
*
pMeta
=
arg
;
int64_t
backendRid
=
pMeta
->
streamBackendRid
;
int64_t
checkpointId
=
pMeta
->
checkpointTs
;
int64_t
st
=
taosGetTimestampMs
();
int32_t
code
=
-
1
;
SBackendHandle
*
pHandle
=
taosAcquireRef
(
streamBackendId
,
backendRid
);
static
int
checkpointSuffix
=
0
;
char
newDir
[
256
]
=
{
0
};
char
oldDir
[
256
]
=
{
0
};
sprintf
(
oldDir
,
"%s/checkpoint_%d"
,
path
,
checkpointSuffix
);
sprintf
(
newDir
,
"%s/checkpoint_%d"
,
path
,
1
-
checkpointSuffix
);
char
checkpointDir
[
256
]
=
{
0
};
sprintf
(
checkpointDir
,
"%s/checkpoint_%"
PRId64
""
,
path
,
checkpointId
);
if
(
pHandle
==
NULL
)
{
return
-
1
;
...
...
@@ -220,7 +273,7 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) {
goto
_ERROR
;
}
rocksdb_checkpoint_create
(
cp
,
new
Dir
,
64
<<
20
,
&
err
);
rocksdb_checkpoint_create
(
cp
,
checkpoint
Dir
,
64
<<
20
,
&
err
);
if
(
err
!=
NULL
)
{
qError
(
"stream backend:%p failed to do checkpoint at:%s, reason:%s"
,
pHandle
,
path
,
err
);
taosMemoryFreeClear
(
err
);
...
...
@@ -231,11 +284,7 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) {
}
rocksdb_checkpoint_object_destroy
(
cp
);
}
if
(
taosIsDir
(
oldDir
))
{
taosRemoveDir
(
oldDir
);
}
taosRenameFile
(
newDir
,
oldDir
);
delObsoleteCheckpoint
(
arg
,
path
);
_ERROR:
taosReleaseRef
(
streamBackendId
,
backendRid
);
return
code
;
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
7d30a6e2
...
...
@@ -93,6 +93,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto
_err
;
}
pMeta
->
streamBackendRid
=
taosAddRef
(
streamBackendId
,
pMeta
->
streamBackend
);
pMeta
->
checkpointSaved
=
taosArrayInit
(
4
,
sizeof
(
int64_t
));
pMeta
->
checkpointInUse
=
taosArrayInit
(
4
,
sizeof
(
int64_t
));
pMeta
->
checkpointCap
=
4
;
taosInitRWLatch
(
&
pMeta
->
checkpointDirLock
);
taosMemoryFree
(
streamPath
);
...
...
@@ -108,6 +112,7 @@ _err:
if
(
pMeta
->
pCheckpointDb
)
tdbTbClose
(
pMeta
->
pCheckpointDb
);
if
(
pMeta
->
db
)
tdbClose
(
pMeta
->
db
);
taosMemoryFree
(
pMeta
);
qError
(
"failed to open stream meta"
);
return
NULL
;
}
...
...
@@ -138,6 +143,9 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosRemoveRef
(
streamBackendId
,
pMeta
->
streamBackendRid
);
pMeta
->
pTaskList
=
taosArrayDestroy
(
pMeta
->
pTaskList
);
taosMemoryFree
(
pMeta
->
path
);
taosArrayDestroy
(
pMeta
->
checkpointSaved
);
taosArrayDestroy
(
pMeta
->
checkpointInUse
);
taosMemoryFree
(
pMeta
);
}
...
...
@@ -419,6 +427,6 @@ int32_t streamDoCheckpoint(SStreamMeta* pMeta) {
qError
(
"failed to create chechpoint %s, reason:%s"
,
buf
,
tstrerror
(
code
));
return
code
;
}
code
=
streamBackendDoCheckpoint
(
pMeta
->
streamBackendRid
,
buf
);
code
=
streamBackendDoCheckpoint
(
(
void
*
)
pMeta
,
buf
);
return
code
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录