Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b20794f3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b20794f3
编写于
5月 21, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(sync) sync/mnode integration
上级
a1cd6839
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
116 addition
and
1 deletion
+116
-1
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+10
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+8
-1
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+98
-0
未找到文件。
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
b20794f3
...
...
@@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_VNODE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT_VNODE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_TIMEOUT
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_PING
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_PING_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_CLIENT_REQUEST
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_CLIENT_REQUEST_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_REQUEST_VOTE
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_APPEND_ENTRIES
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
code
=
0
;
_OVER:
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
b20794f3
...
...
@@ -56,6 +56,13 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
static
void
mmProcessSyncQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
mndProcessSyncMsg
(
pMsg
);
return
;
}
static
void
mmProcessApplyQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
...
...
@@ -201,7 +208,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-sync"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
fp
=
(
FItem
)
mmProcess
Sync
Queue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
syncWorker
,
&
sCfg
)
!=
0
)
{
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
b20794f3
...
...
@@ -356,6 +356,104 @@ int32_t mndProcessApplyMsg(SRpcMsg *pMsg) {
return
code
;
}
#include "syncTools.h"
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
void
*
ahandle
=
pMsg
->
info
.
ahandle
;
int32_t
ret
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
if
(
syncEnvIsStart
())
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMnode
->
syncMgmt
.
sync
);
assert
(
pSyncNode
!=
NULL
);
ESyncState
state
=
syncGetMyRole
(
pMnode
->
syncMgmt
.
sync
);
SyncTerm
currentTerm
=
syncGetMyTerm
(
pMnode
->
syncMgmt
.
sync
);
SMsgHead
*
pHead
=
pMsg
->
pCont
;
char
logBuf
[
512
];
char
*
syncNodeStr
=
sync2SimpleStr
(
pMnode
->
syncMgmt
.
sync
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==vnodeProcessSyncReq== msgType:%d, syncNode: %s"
,
pMsg
->
msgType
,
syncNodeStr
);
syncRpcMsgLog2
(
logBuf
,
pMsg
);
taosMemoryFree
(
syncNodeStr
);
SRpcMsg
*
pRpcMsg
=
pMsg
;
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnTimeoutCb
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnPingCb
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnPingReplyCb
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnRequestVoteCb
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnRequestVoteReplyCb
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnAppendEntriesCb
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnAppendEntriesReplyCb
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
{
mError
(
"==mndProcessSyncMsg== error msg type:%d"
,
pRpcMsg
->
msgType
);
ret
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
}
syncNodeRelease
(
pSyncNode
);
}
else
{
mError
(
"==mndProcessSyncMsg== error syncEnv stop"
);
ret
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
}
return
ret
;
return
0
;
}
int32_t
mndProcessMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
void
*
ahandle
=
pMsg
->
info
.
ahandle
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录