Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
284286cb
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
284286cb
编写于
1月 29, 2023
作者:
X
Xiaoyu Wang
提交者:
GitHub
1月 29, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19621 from taosdata/FIX/TD-21218-main
enh: remove unused old routines in sync
上级
16bae8eb
e55a76e2
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
47 addition
and
1065 deletion
+47
-1065
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+0
-292
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+0
-60
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+1
-219
source/libs/sync/src/syncElection.c
source/libs/sync/src/syncElection.c
+13
-5
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+29
-350
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+4
-1
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+0
-138
未找到文件。
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
284286cb
...
...
@@ -89,45 +89,6 @@
// /\ UNCHANGED <<candidateVars, leaderVars>>
//
int32_t
syncNodeFollowerCommit
(
SSyncNode
*
ths
,
SyncIndex
newCommitIndex
)
{
ASSERT
(
false
&&
"deprecated"
);
if
(
ths
->
state
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
sNTrace
(
ths
,
"can not do follower commit"
);
return
-
1
;
}
// maybe update commit index, leader notice me
if
(
newCommitIndex
>
ths
->
commitIndex
)
{
// has commit entry in local
if
(
newCommitIndex
<=
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
))
{
// advance commit index to sanpshot first
SSnapshot
snapshot
;
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>=
0
&&
snapshot
.
lastApplyIndex
>
ths
->
commitIndex
)
{
SyncIndex
commitBegin
=
ths
->
commitIndex
;
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
ths
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sNTrace
(
ths
,
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
commitBegin
,
commitEnd
);
}
SyncIndex
beginIndex
=
ths
->
commitIndex
+
1
;
SyncIndex
endIndex
=
newCommitIndex
;
// update commit index
ths
->
commitIndex
=
newCommitIndex
;
// call back Wal
int32_t
code
=
ths
->
pLogStore
->
syncLogUpdateCommitIndex
(
ths
->
pLogStore
,
ths
->
commitIndex
);
ASSERT
(
code
==
0
);
code
=
syncNodeDoCommit
(
ths
,
beginIndex
,
endIndex
,
ths
->
state
);
ASSERT
(
code
==
0
);
}
}
return
0
;
}
SSyncRaftEntry
*
syncBuildRaftEntryFromAppendEntries
(
const
SyncAppendEntries
*
pMsg
)
{
SSyncRaftEntry
*
pEntry
=
taosMemoryMalloc
(
pMsg
->
dataLen
);
if
(
pEntry
==
NULL
)
{
...
...
@@ -232,256 +193,3 @@ _IGNORE:
rpcFreeCont
(
rpcRsp
.
pCont
);
return
0
;
}
int32_t
syncNodeOnAppendEntriesOld
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
SRpcMsg
rpcRsp
=
{
0
};
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"not in my config"
);
goto
_IGNORE
;
}
// prepare response msg
int32_t
code
=
syncBuildAppendEntriesReply
(
&
rpcRsp
,
ths
->
vgId
);
if
(
code
!=
0
)
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"build rsp error"
);
goto
_IGNORE
;
}
SyncAppendEntriesReply
*
pReply
=
rpcRsp
.
pCont
;
pReply
->
srcId
=
ths
->
myRaftId
;
pReply
->
destId
=
pMsg
->
srcId
;
pReply
->
term
=
ths
->
raftStore
.
currentTerm
;
pReply
->
success
=
false
;
// pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
pReply
->
lastSendIndex
=
pMsg
->
prevLogIndex
+
1
;
pReply
->
startTime
=
ths
->
startTime
;
if
(
pMsg
->
term
<
ths
->
raftStore
.
currentTerm
)
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"reject, small term"
);
goto
_SEND_RESPONSE
;
}
if
(
pMsg
->
term
>
ths
->
raftStore
.
currentTerm
)
{
pReply
->
term
=
pMsg
->
term
;
}
syncNodeStepDown
(
ths
,
pMsg
->
term
);
syncNodeResetElectTimer
(
ths
);
SyncIndex
startIndex
=
ths
->
pLogStore
->
syncLogBeginIndex
(
ths
->
pLogStore
);
SyncIndex
lastIndex
=
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
);
if
(
pMsg
->
prevLogIndex
>
lastIndex
)
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"reject, index not match"
);
goto
_SEND_RESPONSE
;
}
if
(
pMsg
->
prevLogIndex
>=
startIndex
)
{
SyncTerm
myPreLogTerm
=
syncNodeGetPreTerm
(
ths
,
pMsg
->
prevLogIndex
+
1
);
// ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
if
(
myPreLogTerm
==
SYNC_TERM_INVALID
)
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"reject, pre-term invalid"
);
goto
_SEND_RESPONSE
;
}
if
(
myPreLogTerm
!=
pMsg
->
prevLogTerm
)
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"reject, pre-term not match"
);
goto
_SEND_RESPONSE
;
}
}
// accept
pReply
->
success
=
true
;
bool
hasAppendEntries
=
pMsg
->
dataLen
>
0
;
if
(
hasAppendEntries
)
{
SSyncRaftEntry
*
pAppendEntry
=
syncEntryBuildFromAppendEntries
(
pMsg
);
ASSERT
(
pAppendEntry
!=
NULL
);
SyncIndex
appendIndex
=
pMsg
->
prevLogIndex
+
1
;
LRUHandle
*
hLocal
=
NULL
;
LRUHandle
*
hAppend
=
NULL
;
int32_t
code
=
0
;
SSyncRaftEntry
*
pLocalEntry
=
NULL
;
SLRUCache
*
pCache
=
ths
->
pLogStore
->
pCache
;
hLocal
=
taosLRUCacheLookup
(
pCache
,
&
appendIndex
,
sizeof
(
appendIndex
));
if
(
hLocal
)
{
pLocalEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
hLocal
);
code
=
0
;
ths
->
pLogStore
->
cacheHit
++
;
sNTrace
(
ths
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
appendIndex
,
pLocalEntry
->
bytes
,
pLocalEntry
);
}
else
{
ths
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
ths
,
"miss cache index:%"
PRId64
,
appendIndex
);
code
=
ths
->
pLogStore
->
syncLogGetEntry
(
ths
->
pLogStore
,
appendIndex
,
&
pLocalEntry
);
}
if
(
code
==
0
)
{
// get local entry success
if
(
pLocalEntry
->
term
==
pAppendEntry
->
term
)
{
// do nothing
sNTrace
(
ths
,
"log match, do nothing, index:%"
PRId64
,
appendIndex
);
}
else
{
// truncate
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
appendIndex
);
if
(
code
!=
0
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, truncate error, append-index:%"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDestroy
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDestroy
(
pAppendEntry
);
}
goto
_IGNORE
;
}
ASSERT
(
pAppendEntry
->
index
==
appendIndex
);
// append
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
,
false
);
if
(
code
!=
0
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, append error, append-index:%"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDestroy
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDestroy
(
pAppendEntry
);
}
goto
_IGNORE
;
}
syncCacheEntry
(
ths
->
pLogStore
,
pAppendEntry
,
&
hAppend
);
}
}
else
{
if
(
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
// log not exist
// truncate
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
appendIndex
);
if
(
code
!=
0
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, truncate error, append-index:%"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
syncEntryDestroy
(
pLocalEntry
);
syncEntryDestroy
(
pAppendEntry
);
goto
_IGNORE
;
}
// append
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
,
false
);
if
(
code
!=
0
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, append error, append-index:%"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDestroy
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDestroy
(
pAppendEntry
);
}
goto
_IGNORE
;
}
syncCacheEntry
(
ths
->
pLogStore
,
pAppendEntry
,
&
hAppend
);
}
else
{
// get local entry success
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, get local entry error, append-index:%"
PRId64
" err:%d"
,
appendIndex
,
terrno
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDestroy
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDestroy
(
pAppendEntry
);
}
goto
_IGNORE
;
}
}
// update match index
pReply
->
matchIndex
=
pAppendEntry
->
index
;
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDestroy
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDestroy
(
pAppendEntry
);
}
}
else
{
// no append entries, do nothing
// maybe has extra entries, no harm
// update match index
pReply
->
matchIndex
=
pMsg
->
prevLogIndex
;
}
// maybe update commit index, leader notice me
syncNodeFollowerCommit
(
ths
,
pMsg
->
commitIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"accept"
);
goto
_SEND_RESPONSE
;
_IGNORE:
rpcFreeCont
(
rpcRsp
.
pCont
);
return
0
;
_SEND_RESPONSE:
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
// send response
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcRsp
);
return
0
;
}
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
284286cb
...
...
@@ -89,63 +89,3 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
return
0
;
}
int32_t
syncNodeOnAppendEntriesReplyOld
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
syncLogRecvAppendEntriesReply
(
ths
,
pMsg
,
"not in my config"
);
return
0
;
}
// drop stale response
if
(
pMsg
->
term
<
ths
->
raftStore
.
currentTerm
)
{
syncLogRecvAppendEntriesReply
(
ths
,
pMsg
,
"drop stale response"
);
return
0
;
}
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pMsg
->
term
>
ths
->
raftStore
.
currentTerm
)
{
syncLogRecvAppendEntriesReply
(
ths
,
pMsg
,
"error term"
);
syncNodeStepDown
(
ths
,
pMsg
->
term
);
return
-
1
;
}
ASSERT
(
pMsg
->
term
==
ths
->
raftStore
.
currentTerm
);
if
(
pMsg
->
success
)
{
SyncIndex
oldMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
if
(
pMsg
->
matchIndex
>
oldMatchIndex
)
{
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
);
syncMaybeAdvanceCommitIndex
(
ths
);
// maybe update minMatchIndex
ths
->
minMatchIndex
=
syncMinMatchIndex
(
ths
);
}
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
+
1
);
}
else
{
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
if
(
nextIndex
>
SYNC_INDEX_BEGIN
)
{
--
nextIndex
;
}
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
nextIndex
);
}
// send next append entries
SPeerState
*
pState
=
syncNodeGetPeerState
(
ths
,
&
(
pMsg
->
srcId
));
ASSERT
(
pState
!=
NULL
);
if
(
pMsg
->
lastSendIndex
==
pState
->
lastSendIndex
)
{
int64_t
timeNow
=
taosGetTimestampMs
();
int64_t
elapsed
=
timeNow
-
pState
->
lastSendTime
;
sNTrace
(
ths
,
"sync-append-entries rtt elapsed:%"
PRId64
", index:%"
PRId64
,
elapsed
,
pState
->
lastSendIndex
);
syncNodeReplicateOne
(
ths
,
&
(
pMsg
->
srcId
),
true
);
}
}
syncLogRecvAppendEntriesReply
(
ths
,
pMsg
,
"process"
);
return
0
;
}
source/libs/sync/src/syncCommit.c
浏览文件 @
284286cb
...
...
@@ -43,148 +43,6 @@
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void
syncOneReplicaAdvance
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
false
&&
"deprecated"
);
if
(
pSyncNode
==
NULL
)
{
sError
(
"pSyncNode is NULL"
);
return
;
}
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
sNError
(
pSyncNode
,
"not leader, can not advance commit index"
);
return
;
}
if
(
pSyncNode
->
replicaNum
!=
1
)
{
sNError
(
pSyncNode
,
"not one replica, can not advance commit index"
);
return
;
}
// advance commit index to snapshot first
SSnapshot
snapshot
;
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>
0
&&
snapshot
.
lastApplyIndex
>
pSyncNode
->
commitIndex
)
{
SyncIndex
commitBegin
=
pSyncNode
->
commitIndex
;
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sNTrace
(
pSyncNode
,
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
commitBegin
,
commitEnd
);
}
// advance commit index as large as possible
SyncIndex
lastIndex
=
syncNodeGetLastIndex
(
pSyncNode
);
if
(
lastIndex
>
pSyncNode
->
commitIndex
)
{
sNTrace
(
pSyncNode
,
"commit by wal from index:%"
PRId64
" to index:%"
PRId64
,
pSyncNode
->
commitIndex
+
1
,
lastIndex
);
pSyncNode
->
commitIndex
=
lastIndex
;
}
// call back Wal
SyncIndex
walCommitVer
=
logStoreWalCommitVer
(
pSyncNode
->
pLogStore
);
if
(
pSyncNode
->
commitIndex
>
walCommitVer
)
{
pSyncNode
->
pLogStore
->
syncLogUpdateCommitIndex
(
pSyncNode
->
pLogStore
,
pSyncNode
->
commitIndex
);
}
}
void
syncMaybeAdvanceCommitIndex
(
SSyncNode
*
pSyncNode
)
{
ASSERTS
(
false
,
"deprecated"
);
if
(
pSyncNode
==
NULL
)
{
sError
(
"pSyncNode is NULL"
);
return
;
}
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
sNError
(
pSyncNode
,
"not leader, can not advance commit index"
);
return
;
}
// advance commit index to sanpshot first
SSnapshot
snapshot
;
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>
0
&&
snapshot
.
lastApplyIndex
>
pSyncNode
->
commitIndex
)
{
SyncIndex
commitBegin
=
pSyncNode
->
commitIndex
;
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sNTrace
(
pSyncNode
,
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
commitBegin
,
commitEnd
);
}
// update commit index
SyncIndex
newCommitIndex
=
pSyncNode
->
commitIndex
;
for
(
SyncIndex
index
=
syncNodeGetLastIndex
(
pSyncNode
);
index
>
pSyncNode
->
commitIndex
;
--
index
)
{
bool
agree
=
syncAgree
(
pSyncNode
,
index
);
if
(
agree
)
{
// term
SSyncRaftEntry
*
pEntry
=
NULL
;
SLRUCache
*
pCache
=
pSyncNode
->
pLogStore
->
pCache
;
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
&
index
,
sizeof
(
index
));
if
(
h
)
{
pEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
pSyncNode
->
pLogStore
->
cacheHit
++
;
sNTrace
(
pSyncNode
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
index
,
pEntry
->
bytes
,
pEntry
);
}
else
{
pSyncNode
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
pSyncNode
,
"miss cache index:%"
PRId64
,
index
);
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
index
,
&
pEntry
);
if
(
code
!=
0
)
{
sNError
(
pSyncNode
,
"advance commit index error, read wal index:%"
PRId64
,
index
);
return
;
}
}
// cannot commit, even if quorum agree. need check term!
if
(
pEntry
->
term
<=
pSyncNode
->
raftStore
.
currentTerm
)
{
// update commit index
newCommitIndex
=
index
;
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDestroy
(
pEntry
);
}
break
;
}
else
{
sNTrace
(
pSyncNode
,
"can not commit due to term not equal, index:%"
PRId64
", term:%"
PRIu64
,
pEntry
->
index
,
pEntry
->
term
);
}
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDestroy
(
pEntry
);
}
}
}
// advance commit index as large as possible
SyncIndex
walCommitVer
=
logStoreWalCommitVer
(
pSyncNode
->
pLogStore
);
if
(
walCommitVer
>
newCommitIndex
)
{
newCommitIndex
=
walCommitVer
;
}
// maybe execute fsm
if
(
newCommitIndex
>
pSyncNode
->
commitIndex
)
{
SyncIndex
beginIndex
=
pSyncNode
->
commitIndex
+
1
;
SyncIndex
endIndex
=
newCommitIndex
;
// update commit index
pSyncNode
->
commitIndex
=
newCommitIndex
;
// call back Wal
pSyncNode
->
pLogStore
->
syncLogUpdateCommitIndex
(
pSyncNode
->
pLogStore
,
pSyncNode
->
commitIndex
);
// execute fsm
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pFsm
!=
NULL
)
{
int32_t
code
=
syncNodeDoCommit
(
pSyncNode
,
beginIndex
,
endIndex
,
pSyncNode
->
state
);
if
(
code
!=
0
)
{
sNError
(
pSyncNode
,
"advance commit index error, do commit begin:%"
PRId64
", end:%"
PRId64
,
beginIndex
,
endIndex
);
return
;
}
}
}
}
bool
syncAgreeIndex
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pRaftId
,
SyncIndex
index
)
{
// I am leader, I agree
...
...
@@ -210,83 +68,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
return
c
;
}
int32_t
syncNodeDynamicQuorum
(
const
SSyncNode
*
pSyncNode
)
{
return
pSyncNode
->
quorum
;
#if 0
int32_t quorum = 1; // self
int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncNode->peersNum; ++i) {
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
int64_t recvTimeDiff = TABS(peerRecvTime - timeNow);
int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime);
int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode));
/*
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode));
*/
int32_t addQuorum = 0;
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 1;
} else {
if (logDiff < SYNC_ADD_QUORUM_COUNT) {
addQuorum = 1;
} else {
addQuorum = 0;
}
}
} else {
addQuorum = 0;
}
/*
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1;
} else {
addQuorum = 0;
}
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0;
}
*/
quorum += addQuorum;
}
ASSERT(quorum <= pSyncNode->replicaNum);
if (quorum < pSyncNode->quorum) {
quorum = pSyncNode->quorum;
}
return quorum;
#endif
}
/*
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
int agreeCount = 0;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) {
++agreeCount;
}
if (agreeCount >= syncNodeDynamicQuorum(pSyncNode)) {
return true;
}
}
return false;
}
*/
int32_t
syncNodeDynamicQuorum
(
const
SSyncNode
*
pSyncNode
)
{
return
pSyncNode
->
quorum
;
}
bool
syncNodeAgreedUpon
(
SSyncNode
*
pNode
,
SyncIndex
index
)
{
int
count
=
0
;
...
...
source/libs/sync/src/syncElection.c
浏览文件 @
284286cb
...
...
@@ -43,7 +43,10 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
for
(
int
i
=
0
;
i
<
pNode
->
peersNum
;
++
i
)
{
SRpcMsg
rpcMsg
=
{
0
};
ret
=
syncBuildRequestVote
(
&
rpcMsg
,
pNode
->
vgId
);
ASSERT
(
ret
==
0
);
if
(
ret
<
0
)
{
sError
(
"vgId:%d, failed to build request-vote msg since %s"
,
pNode
->
vgId
,
terrstr
());
continue
;
}
SyncRequestVote
*
pMsg
=
rpcMsg
.
pCont
;
pMsg
->
srcId
=
pNode
->
myRaftId
;
...
...
@@ -51,13 +54,18 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
pMsg
->
term
=
pNode
->
raftStore
.
currentTerm
;
ret
=
syncNodeGetLastIndexTerm
(
pNode
,
&
pMsg
->
lastLogIndex
,
&
pMsg
->
lastLogTerm
);
ASSERT
(
ret
==
0
);
if
(
ret
<
0
)
{
sError
(
"vgId:%d, failed to get index and term of last log since %s"
,
pNode
->
vgId
,
terrstr
());
continue
;
}
ret
=
syncNodeSendMsgById
(
&
pNode
->
peersId
[
i
],
pNode
,
&
rpcMsg
);
ASSERT
(
ret
==
0
);
if
(
ret
<
0
)
{
sError
(
"vgId:%d, failed to send msg to peerId:%"
PRId64
,
pNode
->
vgId
,
pNode
->
peersId
[
i
].
addr
);
continue
;
}
return
ret
;
}
return
0
;
}
int32_t
syncNodeElect
(
SSyncNode
*
pSyncNode
)
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
284286cb
...
...
@@ -586,78 +586,6 @@ SSyncState syncGetState(int64_t rid) {
return
state
;
}
#if 0
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
if (index < SYNC_INDEX_BEGIN) {
return -1;
}
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
if (pEntry != NULL) {
syncEntryDestroy(pEntry);
}
syncNodeRelease(pSyncNode);
return -1;
}
ASSERT(pEntry != NULL);
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = index;
pSnapshot->lastApplyTerm = pEntry->term;
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
syncEntryDestroy(pEntry);
syncNodeRelease(pSyncNode);
return 0;
}
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
sMeta->lastConfigIndex = pSyncNode->raftCfg.lastConfigIndex;
sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->raftCfg.lastConfigIndex);
syncNodeRelease(pSyncNode);
return 0;
}
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
ASSERT(pSyncNode->raftCfg.configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->raftCfg.configIndexArr)[0];
for (int32_t i = 0; i < pSyncNode->raftCfg.configIndexCount; ++i) {
if ((pSyncNode->raftCfg.configIndexArr)[i] > lastIndex &&
(pSyncNode->raftCfg.configIndexArr)[i] <= snapshotIndex) {
lastIndex = (pSyncNode->raftCfg.configIndexArr)[i];
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sMeta->lastConfigIndex);
syncNodeRelease(pSyncNode);
return 0;
}
#endif
SyncIndex
syncNodeGetSnapshotConfigIndex
(
SSyncNode
*
pSyncNode
,
SyncIndex
snapshotLastApplyIndex
)
{
ASSERT
(
pSyncNode
->
raftCfg
.
configIndexCount
>=
1
);
SyncIndex
lastIndex
=
(
pSyncNode
->
raftCfg
.
configIndexArr
)[
0
];
...
...
@@ -1031,9 +959,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode
->
commitIndex
=
commitIndex
;
sInfo
(
"vgId:%d, sync node commitIndex initialized as %"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
commitIndex
);
// restore log store on need
if
(
syncNodeLogStoreRestoreOnNeed
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to restore log store since %s."
,
pSyncNode
->
vgId
,
terrstr
());
goto
_error
;
}
// timer ms init
pSyncNode
->
pingBaseLine
=
PING_TIMER_MS
;
pSyncNode
->
electBaseLine
=
tsElectInterval
;
...
...
@@ -1096,10 +1027,16 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode
->
changing
=
false
;
// replication mgr
syncNodeLogReplMgrInit
(
pSyncNode
);
if
(
syncNodeLogReplMgrInit
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to init repl mgr since %s."
,
pSyncNode
->
vgId
,
terrstr
());
goto
_error
;
}
// peer state
syncNodePeerStateInit
(
pSyncNode
);
if
(
syncNodePeerStateInit
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to init peer stat since %s."
,
pSyncNode
->
vgId
,
terrstr
());
goto
_error
;
}
//
// min match index
...
...
@@ -1194,27 +1131,10 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
int32_t
ret
=
0
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
ASSERT
(
ret
==
0
);
return
ret
;
}
void
syncNodeStartOld
(
SSyncNode
*
pSyncNode
)
{
// start raft
if
(
pSyncNode
->
replicaNum
==
1
)
{
raftStoreNextTerm
(
pSyncNode
);
syncNodeBecomeLeader
(
pSyncNode
,
"one replica start"
);
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop
(
pSyncNode
);
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
}
else
{
syncNodeBecomeFollower
(
pSyncNode
,
"first start"
);
if
(
ret
!=
0
)
{
sError
(
"vgId:%d, failed to start ping timer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
}
int32_t
ret
=
0
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
ASSERT
(
ret
==
0
);
return
ret
;
}
int32_t
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -1225,11 +1145,16 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// reset elect timer, long enough
int32_t
electMS
=
TIMER_MAX_MS
;
int32_t
ret
=
syncNodeRestartElectTimer
(
pSyncNode
,
electMS
);
ASSERT
(
ret
==
0
);
if
(
ret
<
0
)
{
sError
(
"vgId:%d, failed to restart elect timer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
return
-
1
;
}
ret
=
0
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
ASSERT
(
ret
==
0
);
if
(
ret
<
0
)
{
sError
(
"vgId:%d, failed to start ping timer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
return
-
1
;
}
return
ret
;
}
...
...
@@ -1818,12 +1743,6 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode
->
leaderCache
=
pSyncNode
->
myRaftId
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pNextIndex
->
replicaNum
;
++
i
)
{
// maybe overwrite myself, no harm
// just do it!
// pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
// maybe wal is deleted
SyncIndex
lastIndex
;
SyncTerm
lastTerm
;
int32_t
code
=
syncNodeGetLastIndexTerm
(
pSyncNode
,
&
lastIndex
,
&
lastTerm
);
...
...
@@ -1885,7 +1804,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
));
bool
granted
=
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
);
if
(
!
granted
)
{
sError
(
"vgId:%d, not granted by majority."
,
pSyncNode
->
vgId
);
return
;
}
syncNodeBecomeLeader
(
pSyncNode
,
"candidate to leader"
);
sNTrace
(
pSyncNode
,
"state change syncNodeCandidate2Leader"
);
...
...
@@ -1901,20 +1824,6 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
pSyncNode
->
vgId
,
pSyncNode
->
raftStore
.
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
}
void
syncNodeCandidate2LeaderOld
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
));
syncNodeBecomeLeader
(
pSyncNode
,
"candidate to leader"
);
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop
(
pSyncNode
);
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
if
(
pSyncNode
->
replicaNum
>
1
)
{
syncNodeReplicate
(
pSyncNode
);
}
}
bool
syncNodeIsMnode
(
SSyncNode
*
pSyncNode
)
{
return
(
pSyncNode
->
vgId
==
1
);
}
int32_t
syncNodePeerStateInit
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -1960,7 +1869,8 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
// need assert
void
syncNodeVoteForTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
,
SRaftId
*
pRaftId
)
{
ASSERT
(
term
==
pSyncNode
->
raftStore
.
currentTerm
);
ASSERT
(
!
raftStoreHasVoted
(
pSyncNode
));
bool
voted
=
raftStoreHasVoted
(
pSyncNode
);
ASSERT
(
!
voted
);
raftStoreVote
(
pSyncNode
,
pRaftId
);
}
...
...
@@ -2638,24 +2548,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
return
0
;
}
int32_t
syncNodeOnLocalCmdOld
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
ASSERT
(
false
&&
"deprecated"
);
SyncLocalCmd
*
pMsg
=
pRpcMsg
->
pCont
;
syncLogRecvLocalCmd
(
ths
,
pMsg
,
""
);
if
(
pMsg
->
cmd
==
SYNC_LOCAL_CMD_STEP_DOWN
)
{
syncNodeStepDown
(
ths
,
pMsg
->
currentTerm
);
}
else
if
(
pMsg
->
cmd
==
SYNC_LOCAL_CMD_FOLLOWER_CMT
)
{
syncNodeFollowerCommit
(
ths
,
pMsg
->
commitIndex
);
}
else
{
sError
(
"error local cmd"
);
}
return
0
;
}
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
...
...
@@ -2700,96 +2592,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
}
}
int32_t
syncNodeOnClientRequestOld
(
SSyncNode
*
ths
,
SRpcMsg
*
pMsg
,
SyncIndex
*
pRetIndex
)
{
sNTrace
(
ths
,
"on client request"
);
int32_t
ret
=
0
;
int32_t
code
=
0
;
SyncIndex
index
=
ths
->
pLogStore
->
syncLogWriteIndex
(
ths
->
pLogStore
);
SyncTerm
term
=
ths
->
raftStore
.
currentTerm
;
SSyncRaftEntry
*
pEntry
;
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
pEntry
=
syncEntryBuildFromClientRequest
(
pMsg
->
pCont
,
term
,
index
);
}
else
{
pEntry
=
syncEntryBuildFromRpcMsg
(
pMsg
,
term
,
index
);
}
LRUHandle
*
h
=
NULL
;
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
// append entry
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pEntry
,
false
);
if
(
code
!=
0
)
{
if
(
ths
->
replicaNum
==
1
)
{
if
(
h
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
h
,
false
);
}
else
{
syncEntryDestroy
(
pEntry
);
}
return
-
1
;
}
else
{
// del resp mgr, call FpCommitCb
SFsmCbMeta
cbMeta
=
{
.
index
=
pEntry
->
index
,
.
lastConfigIndex
=
SYNC_INDEX_INVALID
,
.
isWeak
=
pEntry
->
isWeak
,
.
code
=
-
1
,
.
state
=
ths
->
state
,
.
seqNum
=
pEntry
->
seqNum
,
.
term
=
pEntry
->
term
,
.
currentTerm
=
ths
->
raftStore
.
currentTerm
,
.
flag
=
0
,
};
ths
->
pFsm
->
FpCommitCb
(
ths
->
pFsm
,
pMsg
,
&
cbMeta
);
if
(
h
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
h
,
false
);
}
else
{
syncEntryDestroy
(
pEntry
);
}
return
-
1
;
}
}
syncCacheEntry
(
ths
->
pLogStore
,
pEntry
,
&
h
);
// if mulit replica, start replicate right now
if
(
ths
->
replicaNum
>
1
)
{
syncNodeReplicate
(
ths
);
}
// if only myself, maybe commit right now
if
(
ths
->
replicaNum
==
1
)
{
if
(
syncNodeIsMnode
(
ths
))
{
syncMaybeAdvanceCommitIndex
(
ths
);
}
else
{
syncOneReplicaAdvance
(
ths
);
}
}
}
if
(
pRetIndex
!=
NULL
)
{
if
(
ret
==
0
&&
pEntry
!=
NULL
)
{
*
pRetIndex
=
pEntry
->
index
;
}
else
{
*
pRetIndex
=
SYNC_INDEX_INVALID
;
}
}
if
(
h
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
h
,
false
);
}
else
{
syncEntryDestroy
(
pEntry
);
}
return
ret
;
}
const
char
*
syncStr
(
ESyncState
state
)
{
switch
(
state
)
{
case
TAOS_SYNC_STATE_FOLLOWER
:
...
...
@@ -2894,129 +2696,6 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
return
(
ths
->
replicaNum
==
1
&&
syncUtilUserCommit
(
pMsg
->
msgType
)
&&
ths
->
vgId
!=
1
);
}
int32_t
syncNodeDoCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
ASSERT
(
false
);
if
(
beginIndex
>
endIndex
)
{
return
0
;
}
if
(
ths
==
NULL
)
{
return
-
1
;
}
if
(
ths
->
pFsm
!=
NULL
&&
ths
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
// advance commit index to sanpshot first
SSnapshot
snapshot
=
{
0
};
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>=
0
&&
snapshot
.
lastApplyIndex
>=
beginIndex
)
{
sNTrace
(
ths
,
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
snapshot
.
lastApplyIndex
);
// update begin index
beginIndex
=
snapshot
.
lastApplyIndex
+
1
;
}
}
int32_t
code
=
0
;
ESyncState
state
=
flag
;
sNTrace
(
ths
,
"commit by wal from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
endIndex
);
// execute fsm
if
(
ths
->
pFsm
!=
NULL
)
{
for
(
SyncIndex
i
=
beginIndex
;
i
<=
endIndex
;
++
i
)
{
if
(
i
!=
SYNC_INDEX_INVALID
)
{
SSyncRaftEntry
*
pEntry
;
SLRUCache
*
pCache
=
ths
->
pLogStore
->
pCache
;
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
&
i
,
sizeof
(
i
));
if
(
h
)
{
pEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
ths
->
pLogStore
->
cacheHit
++
;
sNTrace
(
ths
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
i
,
pEntry
->
bytes
,
pEntry
);
}
else
{
ths
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
ths
,
"miss cache index:%"
PRId64
,
i
);
code
=
ths
->
pLogStore
->
syncLogGetEntry
(
ths
->
pLogStore
,
i
,
&
pEntry
);
// ASSERT(code == 0);
// ASSERT(pEntry != NULL);
if
(
code
!=
0
||
pEntry
==
NULL
)
{
sNError
(
ths
,
"get log entry error"
);
sFatal
(
"vgId:%d, get log entry %"
PRId64
" error when commit since %s"
,
ths
->
vgId
,
i
,
terrstr
());
continue
;
}
}
SRpcMsg
rpcMsg
=
{
0
};
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
sTrace
(
"do commit index:%"
PRId64
", type:%s"
,
i
,
TMSG_INFO
(
pEntry
->
msgType
));
// user commit
if
((
ths
->
pFsm
->
FpCommitCb
!=
NULL
)
&&
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
bool
internalExecute
=
true
;
if
((
ths
->
replicaNum
==
1
)
&&
ths
->
restoreFinish
&&
ths
->
vgId
!=
1
)
{
internalExecute
=
false
;
}
sNTrace
(
ths
,
"user commit index:%"
PRId64
", internal:%d, type:%s"
,
i
,
internalExecute
,
TMSG_INFO
(
pEntry
->
msgType
));
// execute fsm in apply thread, or execute outside syncPropose
if
(
internalExecute
)
{
SFsmCbMeta
cbMeta
=
{
.
index
=
pEntry
->
index
,
.
lastConfigIndex
=
syncNodeGetSnapshotConfigIndex
(
ths
,
pEntry
->
index
),
.
isWeak
=
pEntry
->
isWeak
,
.
code
=
0
,
.
state
=
ths
->
state
,
.
seqNum
=
pEntry
->
seqNum
,
.
term
=
pEntry
->
term
,
.
currentTerm
=
ths
->
raftStore
.
currentTerm
,
.
flag
=
flag
,
};
syncRespMgrGetAndDel
(
ths
->
pSyncRespMgr
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
ths
->
pFsm
->
FpCommitCb
(
ths
->
pFsm
,
&
rpcMsg
,
&
cbMeta
);
}
}
#if 0
// execute in pre-commit
// leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0);
}
#endif
// restore finish
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
if
(
pEntry
->
index
==
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
))
{
if
(
ths
->
restoreFinish
==
false
)
{
if
(
ths
->
pFsm
->
FpRestoreFinishCb
!=
NULL
)
{
ths
->
pFsm
->
FpRestoreFinishCb
(
ths
->
pFsm
);
}
ths
->
restoreFinish
=
true
;
int64_t
restoreDelay
=
taosGetTimestampMs
()
-
ths
->
leaderTime
;
sNTrace
(
ths
,
"restore finish, index:%"
PRId64
", elapsed:%"
PRId64
" ms"
,
pEntry
->
index
,
restoreDelay
);
}
}
rpcFreeCont
(
rpcMsg
.
pCont
);
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDestroy
(
pEntry
);
}
}
}
}
return
0
;
}
bool
syncNodeInRaftGroup
(
SSyncNode
*
ths
,
SRaftId
*
pRaftId
)
{
for
(
int32_t
i
=
0
;
i
<
ths
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
ths
->
replicasId
)[
i
]),
pRaftId
))
{
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
284286cb
...
...
@@ -945,8 +945,11 @@ int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
i
++
)
{
ASSERT
(
pNode
->
logReplMgrs
[
i
]
==
NULL
);
pNode
->
logReplMgrs
[
i
]
=
syncLogReplMgrCreate
();
if
(
pNode
->
logReplMgrs
[
i
]
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pNode
->
logReplMgrs
[
i
]
->
peerId
=
i
;
ASSERTS
(
pNode
->
logReplMgrs
[
i
]
!=
NULL
,
"Out of memory."
);
}
return
0
;
}
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
284286cb
...
...
@@ -48,92 +48,6 @@
int32_t
syncNodeMaybeSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
);
int32_t
syncNodeReplicateOne
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
,
bool
snapshot
)
{
ASSERT
(
false
&&
"deprecated"
);
// next index
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
);
if
(
snapshot
)
{
// maybe start snapshot
SyncIndex
logStartIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
logEndIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
if
(
nextIndex
<
logStartIndex
||
nextIndex
-
1
>
logEndIndex
)
{
sNTrace
(
pSyncNode
,
"maybe start snapshot for next-index:%"
PRId64
", start:%"
PRId64
", end:%"
PRId64
,
nextIndex
,
logStartIndex
,
logEndIndex
);
// start snapshot
int32_t
code
=
syncNodeStartSnapshot
(
pSyncNode
,
pDestId
);
}
}
// pre index, pre term
SyncIndex
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SyncTerm
preLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
nextIndex
);
// prepare entry
SRpcMsg
rpcMsg
=
{
0
};
SyncAppendEntries
*
pMsg
=
NULL
;
SSyncRaftEntry
*
pEntry
=
NULL
;
SLRUCache
*
pCache
=
pSyncNode
->
pLogStore
->
pCache
;
LRUHandle
*
h
=
taosLRUCacheLookup
(
pCache
,
&
nextIndex
,
sizeof
(
nextIndex
));
int32_t
code
=
0
;
if
(
h
)
{
pEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
code
=
0
;
pSyncNode
->
pLogStore
->
cacheHit
++
;
sNTrace
(
pSyncNode
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
nextIndex
,
pEntry
->
bytes
,
pEntry
);
}
else
{
pSyncNode
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
pSyncNode
,
"miss cache index:%"
PRId64
,
nextIndex
);
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
nextIndex
,
&
pEntry
);
}
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
code
=
syncBuildAppendEntries
(
&
rpcMsg
,
(
int32_t
)(
pEntry
->
bytes
),
pSyncNode
->
vgId
);
ASSERT
(
code
==
0
);
pMsg
=
rpcMsg
.
pCont
;
memcpy
(
pMsg
->
data
,
pEntry
,
pEntry
->
bytes
);
}
else
{
if
(
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
// no entry in log
code
=
syncBuildAppendEntries
(
&
rpcMsg
,
0
,
pSyncNode
->
vgId
);
ASSERT
(
code
==
0
);
pMsg
=
rpcMsg
.
pCont
;
}
else
{
sNError
(
pSyncNode
,
"replicate to dnode:%d error, next-index:%"
PRId64
,
DID
(
pDestId
),
nextIndex
);
return
-
1
;
}
}
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDestroy
(
pEntry
);
}
// prepare msg
ASSERT
(
pMsg
!=
NULL
);
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
*
pDestId
;
pMsg
->
term
=
pSyncNode
->
raftStore
.
currentTerm
;
pMsg
->
prevLogIndex
=
preLogIndex
;
pMsg
->
prevLogTerm
=
preLogTerm
;
pMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
pMsg
->
privateTerm
=
0
;
// pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId);
// send msg
syncNodeMaybeSendAppendEntries
(
pSyncNode
,
pDestId
,
&
rpcMsg
);
return
0
;
}
int32_t
syncNodeReplicate
(
SSyncNode
*
pNode
)
{
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
taosThreadMutexLock
(
&
pBuf
->
mutex
);
...
...
@@ -156,25 +70,6 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
return
0
;
}
int32_t
syncNodeReplicateOld
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
return
-
1
;
}
sNTrace
(
pSyncNode
,
"do replicate"
);
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
pDestId
=
&
(
pSyncNode
->
peersId
[
i
]);
ret
=
syncNodeReplicateOne
(
pSyncNode
,
pDestId
,
true
);
if
(
ret
!=
0
)
{
sError
(
"vgId:%d, do append entries error for dnode:%d"
,
pSyncNode
->
vgId
,
DID
(
pDestId
));
}
}
return
0
;
}
int32_t
syncNodeSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
)
{
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
destId
=
*
destRaftId
;
...
...
@@ -182,39 +77,6 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
return
0
;
}
int32_t
syncNodeSendAppendEntriesOld
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
ret
=
0
;
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
if
(
pMsg
==
NULL
)
{
sError
(
"vgId:%d, sync-append-entries msg is NULL"
,
pSyncNode
->
vgId
);
return
0
;
}
SPeerState
*
pState
=
syncNodeGetPeerState
(
pSyncNode
,
destRaftId
);
if
(
pState
==
NULL
)
{
sError
(
"vgId:%d, replica maybe dropped"
,
pSyncNode
->
vgId
);
return
0
;
}
// save index, otherwise pMsg will be free by rpc
SyncIndex
saveLastSendIndex
=
pState
->
lastSendIndex
;
bool
update
=
false
;
if
(
pMsg
->
dataLen
>
0
)
{
saveLastSendIndex
=
pMsg
->
prevLogIndex
+
1
;
update
=
true
;
}
syncLogSendAppendEntries
(
pSyncNode
,
pMsg
,
""
);
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
pRpcMsg
);
if
(
update
)
{
pState
->
lastSendIndex
=
saveLastSendIndex
;
pState
->
lastSendTime
=
taosGetTimestampMs
();
}
return
ret
;
}
int32_t
syncNodeMaybeSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
ret
=
0
;
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录