Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e79e50ae
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e79e50ae
编写于
11月 01, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: refact syncMsg code
上级
fbb1fa53
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
102 addition
and
396 deletion
+102
-396
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+0
-1
include/libs/sync/sync.h
include/libs/sync/sync.h
+2
-10
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+1
-19
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-17
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+5
-115
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+0
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+4
-130
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+0
-1
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+1
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+88
-102
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
e79e50ae
...
...
@@ -99,7 +99,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/
int32_t
mndProcessRpcMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncCtrlMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndPreProcessQueryMsg
(
SRpcMsg
*
pMsg
);
void
mndPostProcessQueryMsg
(
SRpcMsg
*
pMsg
);
...
...
include/libs/sync/sync.h
浏览文件 @
e79e50ae
...
...
@@ -220,21 +220,13 @@ const char* syncStr(ESyncState state);
bool
syncIsRestoreFinish
(
int64_t
rid
);
int32_t
syncGetSnapshotByIndex
(
int64_t
rid
,
SyncIndex
index
,
SSnapshot
*
pSnapshot
);
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pNewCfg
);
// build SRpcMsg, need to call syncPropose with SRpcMsg
int32_t
syncReconfigBuild
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
);
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pCfg
);
int32_t
syncLeaderTransfer
(
int64_t
rid
);
int32_t
syncLeaderTransferTo
(
int64_t
rid
,
SNodeInfo
newLeader
);
int32_t
syncBeginSnapshot
(
int64_t
rid
,
int64_t
lastApplyIndex
);
int32_t
syncEndSnapshot
(
int64_t
rid
);
int32_t
syncStepDown
(
int64_t
rid
,
SyncTerm
newTerm
);
SSyncNode
*
syncNodeAcquire
(
int64_t
rid
);
void
syncNodeRelease
(
SSyncNode
*
pNode
);
int32_t
syncProcessMsg
(
int64_t
rid
,
SRpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
e79e50ae
...
...
@@ -67,24 +67,6 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
static
void
mmProcessSyncCtrlMsg
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
dGTrace
(
"msg:%p, get from mnode-sync-ctrl queue"
,
pMsg
);
SMsgHead
*
pHead
=
pMsg
->
pCont
;
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
pHead
->
vgId
=
ntohl
(
pHead
->
vgId
);
int32_t
code
=
mndProcessSyncCtrlMsg
(
pMsg
);
dGTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
mmProcessSyncMsg
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
...
...
@@ -252,7 +234,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-sync-ctrl"
,
.
fp
=
(
FItem
)
mmProcessSync
Ctrl
Msg
,
.
fp
=
(
FItem
)
mmProcessSyncMsg
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
syncCtrlWorker
,
&
scCfg
)
!=
0
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
e79e50ae
...
...
@@ -133,22 +133,6 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
}
}
static
void
vmProcessSyncCtrlQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
if
(
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
)
==
0
)
continue
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
dGTrace
(
"vgId:%d, msg:%p get from vnode-sync queue"
,
pVnode
->
vgId
,
pMsg
);
int32_t
code
=
vnodeProcessSyncCtrlMsg
(
pVnode
->
pImpl
,
pMsg
,
NULL
);
// no response here
dGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
pVnode
->
vgId
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
}
static
int32_t
vmPutMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
,
EQueueType
qtype
)
{
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
SMsgHead
*
pHead
=
pMsg
->
pCont
;
...
...
@@ -317,7 +301,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t
vmAllocQueue
(
SVnodeMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
pVnode
->
pWriteQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pImpl
,
(
FItems
)
vnodeProposeWriteMsg
);
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
pSyncCtrlQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncCtrlPool
,
pVnode
,
(
FItems
)
vmProcessSync
Ctrl
Queue
);
pVnode
->
pSyncCtrlQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncCtrlPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
applyPool
,
pVnode
->
pImpl
,
(
FItems
)
vnodeApplyWriteMsg
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pStreamQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
streamPool
,
pVnode
,
(
FItem
)
vmProcessStreamQueue
);
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
e79e50ae
...
...
@@ -474,128 +474,18 @@ void mndStop(SMnode *pMnode) {
mndCleanupTimer
(
pMnode
);
}
int32_t
mndProcessSyncCtrlMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
int32_t
code
=
0
;
mInfo
(
"vgId:%d, process sync ctrl msg"
,
1
);
if
(
!
syncIsInit
())
{
mError
(
"failed to process sync msg:%p type:%s since syncEnv stop"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMgmt
->
sync
);
if
(
pSyncNode
==
NULL
)
{
mError
(
"failed to process sync msg:%p type:%s since syncNode is null"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT
)
{
SyncHeartbeat
*
pSyncMsg
=
syncHeartbeatFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeat
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT_REPLY
)
{
SyncHeartbeatReply
*
pSyncMsg
=
syncHeartbeatReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeatReply
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatReplyDestroy
(
pSyncMsg
);
}
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
int32_t
code
=
0
;
if
(
!
syncIsInit
())
{
mError
(
"failed to process sync msg:%p type:%s since syncEnv stop"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMgmt
->
sync
);
if
(
pSyncNode
==
NULL
)
{
mError
(
"failed to process sync msg:%p type:%s since syncNode is null"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPing
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPingReply
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnClientRequest
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVote
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVoteReply
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntries
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntriesReply
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshot
(
pSyncNode
,
pSyncMsg
);
syncSnapshotSendDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_RSP
)
{
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotReply
(
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_LOCAL_CMD
)
{
SyncLocalCmd
*
pSyncMsg
=
syncLocalCmdFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnLocalCmd
(
pSyncNode
,
pSyncMsg
);
syncLocalCmdDestroy
(
pSyncMsg
);
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
-
1
;
}
syncNodeRelease
(
pSyncNode
);
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
mGTrace
(
"vgId:1, sync msg:%p will be processed, type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
int32_t
code
=
syncProcessMsg
(
pMgmt
->
sync
,
pMsg
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
mGError
(
"vgId:1, failed to process sync msg:%p type:%s since %s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
())
;
}
return
code
;
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
e79e50ae
...
...
@@ -83,7 +83,6 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t
vnodeProcessWriteMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
);
int32_t
vnodeProcessSyncMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSyncCtrlMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
void
vnodeProposeWriteMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
e79e50ae
...
...
@@ -230,142 +230,16 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
}
int32_t
vnodeProcessSyncCtrlMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
code
=
0
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
if
(
!
syncIsInit
())
{
vGError
(
"vgId:%d, msg:%p failed to process since sync env not start"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
if
(
pSyncNode
==
NULL
)
{
vGError
(
"vgId:%d, msg:%p failed to process since invalid sync node"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
vGTrace
(
"vgId:%d, sync msg:%p will be processed, type:%s"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT
)
{
SyncHeartbeat
*
pSyncMsg
=
syncHeartbeatFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeat
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT_REPLY
)
{
SyncHeartbeatReply
*
pSyncMsg
=
syncHeartbeatReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeatReply
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatReplyDestroy
(
pSyncMsg
);
}
else
{
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
,
pMsg
->
msgType
);
code
=
-
1
;
}
vTrace
(
"vgId:%d, sync msg:%p is processed, type:%s code:0x%x"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
code
);
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
&&
terrno
==
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
int32_t
vnodeProcessSyncMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
code
=
0
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
if
(
!
syncIsInit
())
{
vGError
(
"vgId:%d, msg:%p failed to process since sync env not start"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
if
(
pSyncNode
==
NULL
)
{
vGError
(
"vgId:%d, msg:%p failed to process since invalid sync node"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
vGTrace
(
"vgId:%d, sync msg:%p will be processed, type:%s"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnPing
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnPingReply
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnClientRequest
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnRequestVote
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnRequestVoteReply
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnAppendEntries
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
code
=
syncNodeOnAppendEntriesReply
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshot
(
pSyncNode
,
pSyncMsg
);
syncSnapshotSendDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_RSP
)
{
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotReply
(
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_LOCAL_CMD
)
{
SyncLocalCmd
*
pSyncMsg
=
syncLocalCmdFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnLocalCmd
(
pSyncNode
,
pSyncMsg
);
syncLocalCmdDestroy
(
pSyncMsg
);
}
else
{
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
,
pMsg
->
msgType
);
code
=
-
1
;
int32_t
code
=
syncProcessMsg
(
pVnode
->
sync
,
pMsg
);
if
(
code
!=
0
)
{
vGError
(
"vgId:%d, failed to process sync msg:%p type:%s since %s"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
());
}
vTrace
(
"vgId:%d, sync msg:%p is processed, type:%s code:0x%x"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
code
);
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
&&
terrno
==
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
e79e50ae
...
...
@@ -302,7 +302,6 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t
syncGetSnapshotMetaByIndex
(
int64_t
rid
,
SyncIndex
snapshotIndex
,
struct
SSnapshotMeta
*
sMeta
);
bool
syncNodeCanChange
(
SSyncNode
*
pSyncNode
);
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pNewCfg
);
int32_t
syncNodeLeaderTransfer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeLeaderTransferTo
(
SSyncNode
*
pSyncNode
,
SNodeInfo
newLeader
);
...
...
source/libs/sync/src/syncEnv.c
浏览文件 @
e79e50ae
...
...
@@ -80,6 +80,7 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
SSyncNode
*
pNode
=
taosAcquireRef
(
gNodeRefId
,
rid
);
if
(
pNode
==
NULL
)
{
sTrace
(
"failed to acquire node from refId:%"
PRId64
,
rid
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
pNode
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
e79e50ae
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sync.h"
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
...
...
@@ -34,8 +35,6 @@
#include "syncUtil.h"
#include "syncVoteMgr.h"
// ------ local funciton ---------
// enqueue message ----
static
void
syncNodeEqPingTimer
(
void
*
param
,
void
*
tmrId
);
static
void
syncNodeEqElectTimer
(
void
*
param
,
void
*
tmrId
);
static
void
syncNodeEqHeartbeatTimer
(
void
*
param
,
void
*
tmrId
);
...
...
@@ -44,158 +43,145 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
static
void
syncNodeEqPeerHeartbeatTimer
(
void
*
param
,
void
*
tmrId
);
static
bool
syncIsConfigChanged
(
const
SSyncCfg
*
pOldCfg
,
const
SSyncCfg
*
pNewCfg
);
// process message ----
int32_t
syncNodeOnPing
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingReply
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pNode
=
syncNodeOpen
(
pSyncInfo
);
if
(
pNode
==
NULL
)
{
SSyncNode
*
p
Sync
Node
=
syncNodeOpen
(
pSyncInfo
);
if
(
p
Sync
Node
==
NULL
)
{
sError
(
"vgId:%d, failed to open sync node"
,
pSyncInfo
->
vgId
);
return
-
1
;
}
p
Node
->
rid
=
syncNodeAdd
(
p
Node
);
if
(
pNode
->
rid
<
0
)
{
syncNodeClose
(
pNode
);
p
SyncNode
->
rid
=
syncNodeAdd
(
pSync
Node
);
if
(
p
Sync
Node
->
rid
<
0
)
{
syncNodeClose
(
p
Sync
Node
);
return
-
1
;
}
return
pNode
->
rid
;
return
p
Sync
Node
->
rid
;
}
void
syncStart
(
int64_t
rid
)
{
SSyncNode
*
pNode
=
syncNodeAcquire
(
rid
);
if
(
pNode
!=
NULL
)
{
syncNodeStart
(
pNode
);
syncNodeRelease
(
pNode
);
SSyncNode
*
p
Sync
Node
=
syncNodeAcquire
(
rid
);
if
(
p
Sync
Node
!=
NULL
)
{
syncNodeStart
(
p
Sync
Node
);
syncNodeRelease
(
p
Sync
Node
);
}
}
void
syncStop
(
int64_t
rid
)
{
SSyncNode
*
pNode
=
syncNodeAcquire
(
rid
);
if
(
pNode
!=
NULL
)
{
syncNodeRelease
(
pNode
);
syncNodeRemove
(
rid
);
}
}
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pNewCfg
)
{
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewCfg
);
if
(
!
IamInNew
)
{
return
false
;
}
if
(
pNewCfg
->
replicaNum
>
pSyncNode
->
replicaNum
+
1
)
{
return
false
;
}
if
(
pNewCfg
->
replicaNum
<
pSyncNode
->
replicaNum
-
1
)
{
return
false
;
}
return
true
;
}
int32_t
syncReconfigBuild
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
0
;
if
(
!
syncNodeCheckNewConfig
(
pSyncNode
,
pNewCfg
))
{
if
(
pSyncNode
!=
NULL
)
{
syncNodeRelease
(
pSyncNode
);
terrno
=
TSDB_CODE_SYN_NEW_CONFIG_ERROR
;
sError
(
"invalid new config. vgId:%d"
,
pSyncNode
->
vgId
);
return
-
1
;
syncNodeRemove
(
rid
);
}
}
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pNewCfg
);
pRpcMsg
->
msgType
=
TDMT_SYNC_CONFIG_CHANGE
;
pRpcMsg
->
info
.
noResp
=
1
;
pRpcMsg
->
contLen
=
strlen
(
newconfig
)
+
1
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
snprintf
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
"%s"
,
newconfig
);
taosMemoryFree
(
newconfig
);
syncNodeRelease
(
pSyncNode
);
return
ret
;
static
bool
syncNodeCheckNewConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
pCfg
)
{
if
(
!
syncNodeInConfig
(
pSyncNode
,
pCfg
))
return
false
;
return
abs
(
pCfg
->
replicaNum
-
pSyncNode
->
replicaNum
)
<=
1
;
}
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pNewCfg
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
==
NULL
)
return
-
1
;
if
(
!
syncNodeCheckNewConfig
(
pSyncNode
,
pNewCfg
))
{
syncNodeRelease
(
pSyncNode
);
terrno
=
TSDB_CODE_SYN_NEW_CONFIG_ERROR
;
sError
(
"
invalid new config. vgId:%d
"
,
pSyncNode
->
vgId
);
sError
(
"
vgId:%d, failed to reconfig since invalid new config
"
,
pSyncNode
->
vgId
);
return
-
1
;
}
#if 0
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
int32_t ret = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
rpcMsg.info.noResp = 1;
rpcMsg.contLen = strlen(newconfig) + 1;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
taosMemoryFree(newconfig);
ret = syncNodePropose(pSyncNode, &rpcMsg, false);
syncNodeRelease(pSyncNode);
return ret;
#else
syncNodeUpdateNewConfigIndex
(
pSyncNode
,
pNewCfg
);
syncNodeDoConfigChange
(
pSyncNode
,
pNewCfg
,
SYNC_INDEX_INVALID
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
syncNodeStopHeartbeatTimer
(
pSyncNode
);
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
syncHbTimerInit
(
pSyncNode
,
&
(
pSyncNode
->
peerHeartbeatTimerArr
[
i
]),
(
pSyncNode
->
replicasId
)
[
i
]);
syncHbTimerInit
(
pSyncNode
,
&
pSyncNode
->
peerHeartbeatTimerArr
[
i
],
pSyncNode
->
replicasId
[
i
]);
}
syncNodeStartHeartbeatTimer
(
pSyncNode
);
syncNodeReplicate
(
pSyncNode
);
}
syncNodeRelease
(
pSyncNode
);
return
0
;
#endif
}
int32_t
syncLeaderTransfer
(
int64_t
rid
)
{
int32_t
syncProcessMsg
(
int64_t
rid
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
if
(
!
syncIsInit
())
return
code
;
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
if
(
pSyncNode
==
NULL
)
return
code
;
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT
)
{
SyncHeartbeat
*
pSyncMsg
=
syncHeartbeatFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeat
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_HEARTBEAT_REPLY
)
{
SyncHeartbeatReply
*
pSyncMsg
=
syncHeartbeatReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnHeartbeatReply
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPing
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnPingReply
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnClientRequest
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVote
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVoteReply
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntries
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntriesReply
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshot
(
pSyncNode
,
pSyncMsg
);
syncSnapshotSendDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_RSP
)
{
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotReply
(
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_LOCAL_CMD
)
{
SyncLocalCmd
*
pSyncMsg
=
syncLocalCmdFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnLocalCmd
(
pSyncNode
,
pSyncMsg
);
syncLocalCmdDestroy
(
pSyncMsg
);
}
else
{
sError
(
"vgId:%d, failed to process msg:%p since invalid type:%s"
,
pSyncNode
->
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
syncNodeLeaderTransfer
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
ret
;
return
code
;
}
int32_t
syncLeaderTransfer
To
(
int64_t
rid
,
SNodeInfo
newLeader
)
{
int32_t
syncLeaderTransfer
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
==
NULL
)
return
-
1
;
int32_t
ret
=
syncNodeLeaderTransfer
To
(
pSyncNode
,
newLeader
);
int32_t
ret
=
syncNodeLeaderTransfer
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
ret
;
}
...
...
@@ -3675,4 +3661,4 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-local-cmd {cmd:%d-%s, sd-new-term:%"
PRIu64
"}, %s"
,
pMsg
->
cmd
,
syncLocalCmdGetStr
(
pMsg
->
cmd
),
pMsg
->
sdNewTerm
,
s
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
\ No newline at end of file
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录