Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4edef438
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4edef438
编写于
11月 26, 2022
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: transfer ownership of msgs while committing sync log entries
上级
a01f34fd
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
28 addition
and
25 deletion
+28
-25
include/libs/sync/sync.h
include/libs/sync/sync.h
+5
-5
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+2
-0
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
+2
-0
source/dnode/mgmt/mgmt_snode/src/smWorker.c
source/dnode/mgmt/mgmt_snode/src/smWorker.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-1
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+7
-5
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+8
-13
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+1
-0
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+1
-1
未找到文件。
include/libs/sync/sync.h
浏览文件 @
4edef438
...
...
@@ -134,13 +134,13 @@ typedef struct SSnapshotMeta {
typedef
struct
SSyncFSM
{
void
*
data
;
void
(
*
FpCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpPreCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpRollBackCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpPreCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpRollBackCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpRestoreFinishCb
)(
const
struct
SSyncFSM
*
pFsm
);
void
(
*
FpReConfigCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SReConfigCbMeta
*
pMeta
);
void
(
*
FpLeaderTransferCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpReConfigCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SReConfigCbMeta
*
pMeta
);
void
(
*
FpLeaderTransferCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
bool
(
*
FpApplyQueueEmptyCb
)(
const
struct
SSyncFSM
*
pFsm
);
int32_t
(
*
FpApplyQueueItems
)(
const
struct
SSyncFSM
*
pFsm
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
4edef438
...
...
@@ -162,11 +162,13 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
if
(
pMsg
==
NULL
)
return
-
1
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
dTrace
(
"msg:%p, is created and will put into %s queue, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pRpc
->
msgType
));
int32_t
code
=
mmPutMsgToWorker
(
pMgmt
,
pWorker
,
pMsg
);
if
(
code
!=
0
)
{
dTrace
(
"msg:%p, is freed"
,
pMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
return
code
;
...
...
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
浏览文件 @
4edef438
...
...
@@ -61,6 +61,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
if
(
pMsg
==
NULL
)
return
-
1
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
switch
(
qtype
)
{
case
QUERY_QUEUE
:
...
...
@@ -74,6 +75,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
return
0
;
default:
terrno
=
TSDB_CODE_INVALID_PARA
;
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
return
-
1
;
}
...
...
source/dnode/mgmt/mgmt_snode/src/smWorker.c
浏览文件 @
4edef438
...
...
@@ -151,6 +151,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pHead
->
vgId
=
SNODE_HANDLE
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
switch
(
qtype
)
{
case
STREAM_QUEUE
:
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
4edef438
...
...
@@ -246,12 +246,12 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
int32_t
code
=
vmPutMsgToQueue
(
pMgmt
,
pMsg
,
qtype
);
if
(
code
!=
0
)
{
dTrace
(
"msg:%p, is freed"
,
pMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
pRpc
->
pCont
=
NULL
;
taosFreeQitem
(
pMsg
);
}
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
4edef438
...
...
@@ -72,15 +72,11 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return
code
;
}
void
mnd
SyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
void
mnd
ProcessWriteMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
SSdbRaw
*
pRaw
=
pMsg
->
pCont
;
// delete msg handle
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
info
=
pMsg
->
info
;
int32_t
transId
=
sdbGetIdFromRaw
(
pMnode
->
pSdb
,
pRaw
);
pMgmt
->
errCode
=
pMeta
->
code
;
mInfo
(
"trans:%d, is proposed, saved:%d code:0x%x, apply index:%"
PRId64
" term:%"
PRIu64
" config:%"
PRId64
...
...
@@ -120,6 +116,12 @@ void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMet
}
}
void
mndSyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
mndProcessWriteMsg
(
pFsm
,
pMsg
,
pMeta
);
rpcFreeCont
(
pMsg
->
pCont
);
pMsg
->
pCont
=
NULL
;
}
int32_t
mndSyncGetSnapshot
(
const
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
,
void
*
pReaderParam
,
void
**
ppReader
)
{
mInfo
(
"start to read snapshot from sdb in atomic way"
);
SMnode
*
pMnode
=
pFsm
->
data
;
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
4edef438
...
...
@@ -295,36 +295,31 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot)
return
0
;
}
static
void
vnodeSyncApplyMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncApplyMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
SRpcMsg
rpcMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
contLen
=
pMsg
->
contLen
};
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
memcpy
(
rpcMsg
.
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
rpcMsg
.
info
=
pMsg
->
info
;
rpcMsg
.
info
.
conn
.
applyIndex
=
pMeta
->
index
;
rpcMsg
.
info
.
conn
.
applyTerm
=
pMeta
->
term
;
pMsg
->
info
.
conn
.
applyIndex
=
pMeta
->
index
;
pMsg
->
info
.
conn
.
applyTerm
=
pMeta
->
term
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vGTrace
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
pMsg
->
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpc
Msg
);
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
p
Msg
);
}
static
void
vnodeSyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
vnodeSyncApplyMsg
(
pFsm
,
pMsg
,
pMeta
);
}
static
void
vnodeSyncPreCommitMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncPreCommitMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
if
(
pMeta
->
isWeak
==
1
)
{
vnodeSyncApplyMsg
(
pFsm
,
pMsg
,
pMeta
);
}
}
static
void
vnodeSyncRollBackMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncRollBackMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
vTrace
(
"vgId:%d, rollback-cb is excuted, fsm:%p, index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
4edef438
...
...
@@ -455,6 +455,7 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
(
void
)
syncRespMgrGetAndDel
(
pNode
->
pSyncRespMgr
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
pFsm
->
FpCommitCb
(
pFsm
,
&
rpcMsg
,
&
cbMeta
);
ASSERT
(
rpcMsg
.
pCont
==
NULL
);
return
0
;
}
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
4edef438
...
...
@@ -20,7 +20,7 @@
SSyncRaftEntry
*
syncEntryBuild
(
int32_t
dataLen
)
{
int32_t
bytes
=
sizeof
(
SSyncRaftEntry
)
+
dataLen
;
SSyncRaftEntry
*
pEntry
=
taosMemory
Malloc
(
bytes
);
SSyncRaftEntry
*
pEntry
=
taosMemory
Calloc
(
1
,
bytes
);
if
(
pEntry
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录