Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d8386754
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d8386754
编写于
4月 11, 2023
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: try to propose vnode commit at vnode closing
上级
3d962cd9
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
27 addition
and
17 deletion
+27
-17
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+6
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+1
-2
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+3
-2
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+16
-12
未找到文件。
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
d8386754
...
@@ -15,6 +15,7 @@
...
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "vmInt.h"
#include "vnd.h"
SVnodeObj
*
vmAcquireVnode
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
)
{
SVnodeObj
*
vmAcquireVnode
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
NULL
;
SVnodeObj
*
pVnode
=
NULL
;
...
@@ -78,6 +79,11 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
...
@@ -78,6 +79,11 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void
vmCloseVnode
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
,
bool
commitAndRemoveWal
)
{
void
vmCloseVnode
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
,
bool
commitAndRemoveWal
)
{
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
bool
atExit
=
true
;
if
(
vnodeIsLeader
(
pVnode
->
pImpl
))
{
vnodeProposeCommitOnNeed
(
pVnode
->
pImpl
,
atExit
);
}
taosThreadRwlockWrlock
(
&
pMgmt
->
lock
);
taosThreadRwlockWrlock
(
&
pMgmt
->
lock
);
taosHashRemove
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
taosHashRemove
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
d8386754
...
@@ -92,7 +92,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
...
@@ -92,7 +92,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
void
vnodeProposeWriteMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
void
vnodeProposeWriteMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
void
vnodeApplyWriteMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
void
vnodeApplyWriteMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
void
vnodeProposeCommitOnNeed
(
SVnode
*
pVnode
);
void
vnodeProposeCommitOnNeed
(
SVnode
*
pVnode
,
bool
atExit
);
// meta
// meta
typedef
struct
SMeta
SMeta
;
// todo: remove
typedef
struct
SMeta
SMeta
;
// todo: remove
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
d8386754
...
@@ -96,7 +96,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
...
@@ -96,7 +96,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
// vnodeCommit.c
int32_t
vnodeBegin
(
SVnode
*
pVnode
);
int32_t
vnodeBegin
(
SVnode
*
pVnode
);
int32_t
vnodeShouldCommit
(
SVnode
*
pVnode
);
int32_t
vnodeShouldCommit
(
SVnode
*
pVnode
,
bool
atExit
);
void
vnodeUpdCommitSched
(
SVnode
*
pVnode
);
void
vnodeUpdCommitSched
(
SVnode
*
pVnode
);
void
vnodeRollback
(
SVnode
*
pVnode
);
void
vnodeRollback
(
SVnode
*
pVnode
);
int32_t
vnodeSaveInfo
(
const
char
*
dir
,
const
SVnodeInfo
*
pCfg
);
int32_t
vnodeSaveInfo
(
const
char
*
dir
,
const
SVnodeInfo
*
pCfg
);
...
@@ -115,7 +115,6 @@ void vnodeSyncClose(SVnode* pVnode);
...
@@ -115,7 +115,6 @@ void vnodeSyncClose(SVnode* pVnode);
void
vnodeRedirectRpcMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int32_t
code
);
void
vnodeRedirectRpcMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int32_t
code
);
bool
vnodeIsLeader
(
SVnode
*
pVnode
);
bool
vnodeIsLeader
(
SVnode
*
pVnode
);
bool
vnodeIsRoleLeader
(
SVnode
*
pVnode
);
bool
vnodeIsRoleLeader
(
SVnode
*
pVnode
);
int
vnodeShouldCommit
(
SVnode
*
pVnode
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
d8386754
...
@@ -149,7 +149,7 @@ void vnodeUpdCommitSched(SVnode *pVnode) {
...
@@ -149,7 +149,7 @@ void vnodeUpdCommitSched(SVnode *pVnode) {
pVnode
->
commitSched
.
maxWaitMs
=
tsVndCommitMaxIntervalMs
+
(
randNum
%
tsVndCommitMaxIntervalMs
);
pVnode
->
commitSched
.
maxWaitMs
=
tsVndCommitMaxIntervalMs
+
(
randNum
%
tsVndCommitMaxIntervalMs
);
}
}
int
vnodeShouldCommit
(
SVnode
*
pVnode
)
{
int
vnodeShouldCommit
(
SVnode
*
pVnode
,
bool
atExit
)
{
SVCommitSched
*
pSched
=
&
pVnode
->
commitSched
;
SVCommitSched
*
pSched
=
&
pVnode
->
commitSched
;
int64_t
nowMs
=
taosGetMonoTimestampMs
();
int64_t
nowMs
=
taosGetMonoTimestampMs
();
bool
diskAvail
=
osDataSpaceAvailable
();
bool
diskAvail
=
osDataSpaceAvailable
();
...
@@ -158,7 +158,8 @@ int vnodeShouldCommit(SVnode *pVnode) {
...
@@ -158,7 +158,8 @@ int vnodeShouldCommit(SVnode *pVnode) {
taosThreadMutexLock
(
&
pVnode
->
mutex
);
taosThreadMutexLock
(
&
pVnode
->
mutex
);
if
(
pVnode
->
inUse
&&
diskAvail
)
{
if
(
pVnode
->
inUse
&&
diskAvail
)
{
needCommit
=
needCommit
=
((
pVnode
->
inUse
->
size
>
pVnode
->
inUse
->
node
.
size
)
&&
(
pSched
->
commitMs
+
SYNC_VND_COMMIT_MIN_MS
<
nowMs
));
((
pVnode
->
inUse
->
size
>
pVnode
->
inUse
->
node
.
size
)
&&
(
pSched
->
commitMs
+
SYNC_VND_COMMIT_MIN_MS
<
nowMs
))
||
((
pVnode
->
inUse
->
size
>
0
)
&&
atExit
);
}
}
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
return
needCommit
;
return
needCommit
;
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
d8386754
...
@@ -129,8 +129,8 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
...
@@ -129,8 +129,8 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
return
code
;
return
code
;
}
}
void
vnodeProposeCommitOnNeed
(
SVnode
*
pVnode
)
{
void
vnodeProposeCommitOnNeed
(
SVnode
*
pVnode
,
bool
atExit
)
{
if
(
!
vnodeShouldCommit
(
pVnode
))
{
if
(
!
vnodeShouldCommit
(
pVnode
,
atExit
))
{
return
;
return
;
}
}
...
@@ -145,18 +145,20 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode) {
...
@@ -145,18 +145,20 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode) {
rpcMsg
.
pCont
=
pHead
;
rpcMsg
.
pCont
=
pHead
;
rpcMsg
.
info
.
noResp
=
1
;
rpcMsg
.
info
.
noResp
=
1
;
vInfo
(
"vgId:%d, propose vnode commit"
,
pVnode
->
config
.
vgId
);
bool
isWeak
=
false
;
bool
isWeak
=
false
;
if
(
vnodeProposeMsg
(
pVnode
,
&
rpcMsg
,
isWeak
)
<
0
)
{
vTrace
(
"vgId:%d, failed to propose vnode commit since %s"
,
pVnode
->
config
.
vgId
,
terrstr
());
goto
_out
;
}
vInfo
(
"vgId:%d, proposed vnode commit"
,
pVnode
->
config
.
vgId
);
if
(
!
atExit
)
{
if
(
vnodeProposeMsg
(
pVnode
,
&
rpcMsg
,
isWeak
)
<
0
)
{
vTrace
(
"vgId:%d, failed to propose vnode commit since %s"
,
pVnode
->
config
.
vgId
,
terrstr
());
}
rpcFreeCont
(
rpcMsg
.
pCont
);
rpcMsg
.
pCont
=
NULL
;
}
else
{
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
rpcMsg
);
}
_out:
vnodeUpdCommitSched
(
pVnode
);
vnodeUpdCommitSched
(
pVnode
);
rpcFreeCont
(
rpcMsg
.
pCont
);
rpcMsg
.
pCont
=
NULL
;
}
}
#if BATCH_ENABLE
#if BATCH_ENABLE
...
@@ -236,7 +238,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
...
@@ -236,7 +238,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue
;
continue
;
}
}
vnodeProposeCommitOnNeed
(
pVnode
);
bool
atExit
=
false
;
vnodeProposeCommitOnNeed
(
pVnode
,
atExit
);
code
=
vnodePreProcessWriteMsg
(
pVnode
,
pMsg
);
code
=
vnodePreProcessWriteMsg
(
pVnode
,
pMsg
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
...
@@ -288,7 +291,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
...
@@ -288,7 +291,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue
;
continue
;
}
}
vnodeProposeCommitOnNeed
(
pVnode
);
bool
atExit
=
false
;
vnodeProposeCommitOnNeed
(
pVnode
,
atExit
);
code
=
vnodePreProcessWriteMsg
(
pVnode
,
pMsg
);
code
=
vnodePreProcessWriteMsg
(
pVnode
,
pMsg
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录