Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ecc43b66
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ecc43b66
编写于
6月 21, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): delete some assert
上级
b61a5cd1
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
332 addition
and
270 deletion
+332
-270
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+1
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+33
-10
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+1
-1
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+1
-1
source/libs/sync/src/syncElection.c
source/libs/sync/src/syncElection.c
+5
-5
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+4
-4
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+24
-24
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+4
-4
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+86
-48
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+85
-85
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+24
-24
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+8
-8
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+8
-8
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+19
-19
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+7
-7
source/libs/sync/src/syncRequestVote.c
source/libs/sync/src/syncRequestVote.c
+1
-1
source/libs/sync/src/syncRequestVoteReply.c
source/libs/sync/src/syncRequestVoteReply.c
+3
-3
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+1
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+4
-4
source/libs/sync/src/syncVoteMgr.c
source/libs/sync/src/syncVoteMgr.c
+13
-13
未找到文件。
source/libs/sync/inc/syncInt.h
浏览文件 @
ecc43b66
...
...
@@ -197,6 +197,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON
*
syncNode2Json
(
const
SSyncNode
*
pSyncNode
);
char
*
syncNode2Str
(
const
SSyncNode
*
pSyncNode
);
void
syncNodeEventLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
);
void
syncNodeErrorLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
);
char
*
syncNode2SimpleStr
(
const
SSyncNode
*
pSyncNode
);
bool
syncNodeInConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
config
);
void
syncNodeDoConfigChange
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
newConfig
,
SyncIndex
lastConfigChangeIndex
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
ecc43b66
...
...
@@ -99,19 +99,25 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
syncNodeUpdateTerm
(
ths
,
pMsg
->
term
);
}
assert
(
pMsg
->
term
<=
ths
->
pRaftStore
->
currentTerm
);
ASSERT
(
pMsg
->
term
<=
ths
->
pRaftStore
->
currentTerm
);
// reset elect timer
if
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
)
{
ths
->
leaderCache
=
pMsg
->
srcId
;
syncNodeResetElectTimer
(
ths
);
}
assert
(
pMsg
->
dataLen
>=
0
);
ASSERT
(
pMsg
->
dataLen
>=
0
);
SyncTerm
localPreLogTerm
=
0
;
if
(
pMsg
->
prevLogIndex
>=
SYNC_INDEX_BEGIN
&&
pMsg
->
prevLogIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
{
SSyncRaftEntry
*
pEntry
=
ths
->
pLogStore
->
getEntry
(
ths
->
pLogStore
,
pMsg
->
prevLogIndex
);
assert
(
pEntry
!=
NULL
);
if
(
pEntry
==
NULL
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"getEntry error, index:%ld, since %s"
,
pMsg
->
prevLogIndex
,
terrstr
());
syncNodeErrorLog
(
ths
,
logBuf
);
return
-
1
;
}
localPreLogTerm
=
pEntry
->
term
;
syncEntryDestory
(
pEntry
);
}
...
...
@@ -160,7 +166,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// accept request
if
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
&&
ths
->
state
==
TAOS_SYNC_STATE_FOLLOWER
&&
logOK
)
{
// preIndex = -1, or has preIndex entry in local log
assert
(
pMsg
->
prevLogIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
));
ASSERT
(
pMsg
->
prevLogIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
));
// has extra entries (> preIndex) in local log
bool
hasExtraEntries
=
pMsg
->
prevLogIndex
<
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
);
...
...
@@ -179,13 +185,21 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncIndex
extraIndex
=
pMsg
->
prevLogIndex
+
1
;
SSyncRaftEntry
*
pExtraEntry
=
ths
->
pLogStore
->
getEntry
(
ths
->
pLogStore
,
extraIndex
);
assert
(
pExtraEntry
!=
NULL
);
if
(
pExtraEntry
==
NULL
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"getEntry error2, index:%ld, since %s"
,
extraIndex
,
terrstr
());
syncNodeErrorLog
(
ths
,
logBuf
);
return
-
1
;
}
SSyncRaftEntry
*
pAppendEntry
=
syncEntryDeserialize
(
pMsg
->
data
,
pMsg
->
dataLen
);
assert
(
pAppendEntry
!=
NULL
);
if
(
pAppendEntry
==
NULL
)
{
syncNodeErrorLog
(
ths
,
"syncEntryDeserialize pAppendEntry error"
);
return
-
1
;
}
// log not match, conflict
assert
(
extraIndex
==
pAppendEntry
->
index
);
ASSERT
(
extraIndex
==
pAppendEntry
->
index
);
if
(
pExtraEntry
->
term
!=
pAppendEntry
->
term
)
{
conflict
=
true
;
}
...
...
@@ -201,7 +215,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
for
(
SyncIndex
index
=
delEnd
;
index
>=
delBegin
;
--
index
)
{
if
(
ths
->
pFsm
->
FpRollBackCb
!=
NULL
)
{
SSyncRaftEntry
*
pRollBackEntry
=
ths
->
pLogStore
->
getEntry
(
ths
->
pLogStore
,
index
);
assert
(
pRollBackEntry
!=
NULL
);
if
(
pRollBackEntry
==
NULL
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"getEntry error3, index:%ld, since %s"
,
index
,
terrstr
());
syncNodeErrorLog
(
ths
,
logBuf
);
return
-
1
;
}
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
if
(
syncUtilUserRollback
(
pRollBackEntry
->
msgType
))
{
...
...
@@ -257,7 +276,10 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
else
if
(
!
hasExtraEntries
&&
hasAppendEntries
)
{
SSyncRaftEntry
*
pAppendEntry
=
syncEntryDeserialize
(
pMsg
->
data
,
pMsg
->
dataLen
);
assert
(
pAppendEntry
!=
NULL
);
if
(
pAppendEntry
==
NULL
)
{
syncNodeErrorLog
(
ths
,
"syncEntryDeserialize pAppendEntry2 error"
);
return
-
1
;
}
// append new entries
ths
->
pLogStore
->
appendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
...
...
@@ -287,7 +309,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// do nothing
}
else
{
assert
(
0
);
syncNodeLog3
(
""
,
ths
);
ASSERT
(
0
);
}
SyncAppendEntriesReply
*
pReply
=
syncAppendEntriesReplyBuild
(
ths
->
vgId
);
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
ecc43b66
...
...
@@ -67,7 +67,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
return
ret
;
}
assert
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
if
(
pMsg
->
success
)
{
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
ecc43b66
...
...
@@ -75,7 +75,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if
(
agree
)
{
// term
SSyncRaftEntry
*
pEntry
=
pSyncNode
->
pLogStore
->
getEntry
(
pSyncNode
->
pLogStore
,
index
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
// cannot commit, even if quorum agree. need check term!
if
(
pEntry
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
...
...
source/libs/sync/src/syncElection.c
浏览文件 @
ecc43b66
...
...
@@ -32,7 +32,7 @@
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeRequestVotePeers
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
...
...
@@ -44,7 +44,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
pMsg
->
lastLogTerm
=
pSyncNode
->
pLogStore
->
getLastTerm
(
pSyncNode
->
pLogStore
);
ret
=
syncNodeRequestVote
(
pSyncNode
,
&
pSyncNode
->
peersId
[
i
],
pMsg
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
syncRequestVoteDestroy
(
pMsg
);
}
return
ret
;
...
...
@@ -75,7 +75,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
syncNodeFollower2Candidate
(
pSyncNode
);
}
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
// start election
raftStoreNextTerm
(
pSyncNode
->
pRaftStore
);
...
...
@@ -86,7 +86,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeVoteForSelf
(
pSyncNode
);
if
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
))
{
// only myself, to leader
assert
(
!
pSyncNode
->
pVotesGranted
->
toLeader
);
ASSERT
(
!
pSyncNode
->
pVotesGranted
->
toLeader
);
syncNodeCandidate2Leader
(
pSyncNode
);
pSyncNode
->
pVotesGranted
->
toLeader
=
true
;
return
ret
;
...
...
@@ -98,7 +98,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
ret
=
syncNodeRequestVotePeers
(
pSyncNode
);
}
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
syncNodeResetElectTimer
(
pSyncNode
);
return
ret
;
...
...
source/libs/sync/src/syncEnv.c
浏览文件 @
ecc43b66
...
...
@@ -14,7 +14,7 @@
*/
#include "syncEnv.h"
// #include <
assert
.h>
// #include <
ASSERT
.h>
SSyncEnv
*
gSyncEnv
=
NULL
;
...
...
@@ -40,7 +40,7 @@ int32_t syncEnvStart() {
taosSeedRand
(
seed
);
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv
=
doSyncEnvStart
();
assert
(
gSyncEnv
!=
NULL
);
ASSERT
(
gSyncEnv
!=
NULL
);
sTrace
(
"sync env start ok"
);
return
ret
;
}
...
...
@@ -86,7 +86,7 @@ static void syncEnvTick(void *param, void *tmrId) {
static
SSyncEnv
*
doSyncEnvStart
()
{
SSyncEnv
*
pSyncEnv
=
(
SSyncEnv
*
)
taosMemoryMalloc
(
sizeof
(
SSyncEnv
));
assert
(
pSyncEnv
!=
NULL
);
ASSERT
(
pSyncEnv
!=
NULL
);
memset
(
pSyncEnv
,
0
,
sizeof
(
SSyncEnv
));
pSyncEnv
->
envTickTimerCounter
=
0
;
...
...
@@ -103,7 +103,7 @@ static SSyncEnv *doSyncEnvStart() {
}
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
)
{
assert
(
pSyncEnv
==
gSyncEnv
);
ASSERT
(
pSyncEnv
==
gSyncEnv
);
if
(
pSyncEnv
!=
NULL
)
{
atomic_store_8
(
&
(
pSyncEnv
->
isStart
),
0
);
taosTmrCleanUp
(
pSyncEnv
->
pTimerManager
);
...
...
source/libs/sync/src/syncIO.c
浏览文件 @
ecc43b66
...
...
@@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static
int32_t
syncIOStartInternal
(
SSyncIO
*
io
);
static
int32_t
syncIOStopInternal
(
SSyncIO
*
io
);
static
void
*
syncIOConsumerFunc
(
void
*
param
);
static
void
*
syncIOConsumerFunc
(
void
*
param
);
static
void
syncIOProcessRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncIOProcessReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
...
...
@@ -47,11 +47,11 @@ static void syncIOTickPing(void *param, void *tmrId);
int32_t
syncIOStart
(
char
*
host
,
uint16_t
port
)
{
int32_t
ret
=
0
;
gSyncIO
=
syncIOCreate
(
host
,
port
);
assert
(
gSyncIO
!=
NULL
);
ASSERT
(
gSyncIO
!=
NULL
);
taosSeedRand
(
taosGetTimestampSec
());
ret
=
syncIOStartInternal
(
gSyncIO
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
sTrace
(
"syncIOStart ok, gSyncIO:%p"
,
gSyncIO
);
return
ret
;
...
...
@@ -59,16 +59,16 @@ int32_t syncIOStart(char *host, uint16_t port) {
int32_t
syncIOStop
()
{
int32_t
ret
=
syncIOStopInternal
(
gSyncIO
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
ret
=
syncIODestroy
(
gSyncIO
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOSendMsg
(
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
assert
(
pEpSet
->
inUse
==
0
);
assert
(
pEpSet
->
numOfEps
==
1
);
ASSERT
(
pEpSet
->
inUse
==
0
);
ASSERT
(
pEpSet
->
numOfEps
==
1
);
int32_t
ret
=
0
;
{
...
...
@@ -107,25 +107,25 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t
syncIOQTimerStart
()
{
int32_t
ret
=
syncIOStartQ
(
gSyncIO
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOQTimerStop
()
{
int32_t
ret
=
syncIOStopQ
(
gSyncIO
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOPingTimerStart
()
{
int32_t
ret
=
syncIOStartPing
(
gSyncIO
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOPingTimerStop
()
{
int32_t
ret
=
syncIOStopPing
(
gSyncIO
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
return
ret
;
}
...
...
@@ -151,7 +151,7 @@ static SSyncIO *syncIOCreate(char *host, uint16_t port) {
static
int32_t
syncIODestroy
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
int8_t
start
=
atomic_load_8
(
&
io
->
isStart
);
assert
(
start
==
0
);
ASSERT
(
start
==
0
);
if
(
io
->
serverRpc
!=
NULL
)
{
rpcClose
(
io
->
serverRpc
);
...
...
@@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static
void
*
syncIOConsumerFunc
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
qall
=
taosAllocateQall
();
while
(
1
)
{
...
...
@@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) {
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_PING
)
{
if
(
io
->
FpOnSyncPing
!=
NULL
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncPing
(
io
->
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
...
...
@@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_PING_REPLY
)
{
if
(
io
->
FpOnSyncPingReply
!=
NULL
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncPingReply
(
io
->
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
...
...
@@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
if
(
io
->
FpOnSyncClientRequest
!=
NULL
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncClientRequest
(
io
->
pSyncNode
,
pSyncMsg
);
syncClientRequestDestroy
(
pSyncMsg
);
}
...
...
@@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
if
(
io
->
FpOnSyncRequestVote
!=
NULL
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncRequestVote
(
io
->
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
...
...
@@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE_REPLY
)
{
if
(
io
->
FpOnSyncRequestVoteReply
!=
NULL
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncRequestVoteReply
(
io
->
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
...
...
@@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES
)
{
if
(
io
->
FpOnSyncAppendEntries
!=
NULL
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncAppendEntries
(
io
->
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
...
...
@@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_APPEND_ENTRIES_REPLY
)
{
if
(
io
->
FpOnSyncAppendEntriesReply
!=
NULL
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncAppendEntriesReply
(
io
->
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
...
...
@@ -320,7 +320,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
if
(
io
->
FpOnSyncTimeout
!=
NULL
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncTimeout
(
io
->
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
...
...
@@ -328,7 +328,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
if
(
io
->
FpOnSyncSnapshotSend
!=
NULL
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncSnapshotSend
(
io
->
pSyncNode
,
pSyncMsg
);
syncSnapshotSendDestroy
(
pSyncMsg
);
}
...
...
@@ -336,7 +336,7 @@ static void *syncIOConsumerFunc(void *param) {
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_RSP
)
{
if
(
io
->
FpOnSyncSnapshotRsp
!=
NULL
)
{
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncSnapshotRsp
(
io
->
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
...
...
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
ecc43b66
...
...
@@ -20,7 +20,7 @@
SSyncIndexMgr
*
syncIndexMgrCreate
(
SSyncNode
*
pSyncNode
)
{
SSyncIndexMgr
*
pSyncIndexMgr
=
taosMemoryMalloc
(
sizeof
(
SSyncIndexMgr
));
assert
(
pSyncIndexMgr
!=
NULL
);
ASSERT
(
pSyncIndexMgr
!=
NULL
);
memset
(
pSyncIndexMgr
,
0
,
sizeof
(
SSyncIndexMgr
));
pSyncIndexMgr
->
replicas
=
&
(
pSyncNode
->
replicasId
);
...
...
@@ -63,7 +63,7 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
}
// maybe config change
//
assert
(0);
//
ASSERT
(0);
char
host
[
128
];
uint16_t
port
;
...
...
@@ -169,7 +169,7 @@ void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, S
}
// maybe config change
//
assert
(0);
//
ASSERT
(0);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -183,5 +183,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI
return
term
;
}
}
assert
(
0
);
ASSERT
(
0
);
}
\ No newline at end of file
source/libs/sync/src/syncMain.c
浏览文件 @
ecc43b66
...
...
@@ -75,7 +75,7 @@ int32_t syncInit() {
void
syncCleanUp
()
{
int32_t
ret
=
syncEnvStop
();
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
if
(
tsNodeRefId
!=
-
1
)
{
taosCloseRef
(
tsNodeRefId
);
...
...
@@ -85,7 +85,7 @@ void syncCleanUp() {
int64_t
syncOpen
(
const
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
assert
(
pSyncNode
!=
NULL
);
ASSERT
(
pSyncNode
!=
NULL
);
if
(
gRaftDetailLog
)
{
syncNodeLog2
(
"syncNodeOpen open success"
,
pSyncNode
);
...
...
@@ -318,7 +318,7 @@ bool syncCanLeaderTransfer(int64_t rid) {
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
->
replicaNum
==
1
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -355,7 +355,7 @@ ESyncState syncGetMyRole(int64_t rid) {
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_STATE_ERROR
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
ESyncState
state
=
pSyncNode
->
state
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -367,7 +367,7 @@ bool syncIsReady(int64_t rid) {
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
bool
b
=
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
&&
pSyncNode
->
restoreFinish
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
b
;
...
...
@@ -378,7 +378,7 @@ bool syncIsRestoreFinish(int64_t rid) {
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
bool
b
=
pSyncNode
->
restoreFinish
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -390,7 +390,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
if
(
pSyncNode
==
NULL
)
{
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
sMeta
->
lastConfigIndex
=
pSyncNode
->
pRaftCfg
->
lastConfigIndex
;
sTrace
(
"vgId:%d, get snapshot meta, lastConfigIndex:%"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
);
...
...
@@ -404,7 +404,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
if
(
pSyncNode
==
NULL
)
{
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
pSyncNode
->
pRaftCfg
->
configIndexCount
>=
1
);
SyncIndex
lastIndex
=
(
pSyncNode
->
pRaftCfg
->
configIndexArr
)[
0
];
...
...
@@ -448,7 +448,7 @@ SyncTerm syncGetMyTerm(int64_t rid) {
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_STATE_ERROR
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
SyncTerm
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -460,7 +460,7 @@ SyncGroupId syncGetVgId(int64_t rid) {
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_STATE_ERROR
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
SyncGroupId
vgId
=
pSyncNode
->
vgId
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -473,7 +473,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
memset
(
pEpSet
,
0
,
sizeof
(
*
pEpSet
));
return
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
pEpSet
->
numOfEps
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
snprintf
(
pEpSet
->
eps
[
i
].
fqdn
,
sizeof
(
pEpSet
->
eps
[
i
].
fqdn
),
"%s"
,
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodeFqdn
);
...
...
@@ -494,7 +494,7 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_STATE_ERROR
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
SRespStub
stub
;
int32_t
ret
=
syncRespMgrGet
(
pSyncNode
->
pSyncRespMgr
,
index
,
&
stub
);
...
...
@@ -511,7 +511,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo)
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_STATE_ERROR
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
SRespStub
stub
;
int32_t
ret
=
syncRespMgrGetAndDel
(
pSyncNode
->
pSyncRespMgr
,
index
,
&
stub
);
...
...
@@ -530,7 +530,7 @@ void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
sTrace
(
"syncSetQ get pSyncNode is NULL, rid:%ld"
,
rid
);
return
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
pSyncNode
->
msgcb
=
msgcb
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -542,7 +542,7 @@ char* sync2SimpleStr(int64_t rid) {
sTrace
(
"syncSetRpc get pSyncNode is NULL, rid:%ld"
,
rid
);
return
NULL
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
char
*
s
=
syncNode2SimpleStr
(
pSyncNode
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -554,7 +554,7 @@ void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
if
(
pSyncNode
==
NULL
)
{
return
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
pSyncNode
->
pingBaseLine
=
pingTimerMS
;
pSyncNode
->
pingTimerMS
=
pingTimerMS
;
...
...
@@ -566,7 +566,7 @@ void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
if
(
pSyncNode
==
NULL
)
{
return
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
pSyncNode
->
electBaseLine
=
electTimerMS
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -577,7 +577,7 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
if
(
pSyncNode
==
NULL
)
{
return
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
pSyncNode
->
hbBaseLine
=
hbTimerMS
;
pSyncNode
->
heartbeatTimerMS
=
hbTimerMS
;
...
...
@@ -592,7 +592,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
ASSERT
(
rid
==
pSyncNode
->
rid
);
ret
=
syncNodePropose
(
pSyncNode
,
pMsg
,
isWeak
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -662,7 +662,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo
*
pSyncInfo
=
(
SSyncInfo
*
)
pOldSyncInfo
;
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
SSyncNode
));
assert
(
pSyncNode
!=
NULL
);
ASSERT
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
int32_t
ret
=
0
;
...
...
@@ -682,12 +682,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta
.
snapshotEnable
=
pSyncInfo
->
snapshotEnable
;
meta
.
lastConfigIndex
=
SYNC_INDEX_INVALID
;
ret
=
raftCfgCreateFile
((
SSyncCfg
*
)
&
(
pSyncInfo
->
syncCfg
),
meta
,
pSyncNode
->
configPath
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
}
else
{
// update syncCfg by raft_config.json
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
assert
(
pSyncNode
->
pRaftCfg
!=
NULL
);
ASSERT
(
pSyncNode
->
pRaftCfg
!=
NULL
);
pSyncInfo
->
syncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
if
(
gRaftDetailLog
)
{
...
...
@@ -712,7 +712,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init raft config
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
assert
(
pSyncNode
->
pRaftCfg
!=
NULL
);
ASSERT
(
pSyncNode
->
pRaftCfg
!=
NULL
);
// init internal
pSyncNode
->
myNodeInfo
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
];
...
...
@@ -771,23 +771,23 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init TLA+ server vars
pSyncNode
->
state
=
TAOS_SYNC_STATE_FOLLOWER
;
pSyncNode
->
pRaftStore
=
raftStoreOpen
(
pSyncNode
->
raftStorePath
);
assert
(
pSyncNode
->
pRaftStore
!=
NULL
);
ASSERT
(
pSyncNode
->
pRaftStore
!=
NULL
);
// init TLA+ candidate vars
pSyncNode
->
pVotesGranted
=
voteGrantedCreate
(
pSyncNode
);
assert
(
pSyncNode
->
pVotesGranted
!=
NULL
);
ASSERT
(
pSyncNode
->
pVotesGranted
!=
NULL
);
pSyncNode
->
pVotesRespond
=
votesRespondCreate
(
pSyncNode
);
assert
(
pSyncNode
->
pVotesRespond
!=
NULL
);
ASSERT
(
pSyncNode
->
pVotesRespond
!=
NULL
);
// init TLA+ leader vars
pSyncNode
->
pNextIndex
=
syncIndexMgrCreate
(
pSyncNode
);
assert
(
pSyncNode
->
pNextIndex
!=
NULL
);
ASSERT
(
pSyncNode
->
pNextIndex
!=
NULL
);
pSyncNode
->
pMatchIndex
=
syncIndexMgrCreate
(
pSyncNode
);
assert
(
pSyncNode
->
pMatchIndex
!=
NULL
);
ASSERT
(
pSyncNode
->
pMatchIndex
!=
NULL
);
// init TLA+ log vars
pSyncNode
->
pLogStore
=
logStoreCreate
(
pSyncNode
);
assert
(
pSyncNode
->
pLogStore
!=
NULL
);
ASSERT
(
pSyncNode
->
pLogStore
!=
NULL
);
pSyncNode
->
commitIndex
=
SYNC_INDEX_INVALID
;
// timer ms init
...
...
@@ -845,7 +845,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// tools
pSyncNode
->
pSyncRespMgr
=
syncRespMgrCreate
(
pSyncNode
,
0
);
assert
(
pSyncNode
->
pSyncRespMgr
!=
NULL
);
ASSERT
(
pSyncNode
->
pSyncRespMgr
!=
NULL
);
// restore state
pSyncNode
->
restoreFinish
=
false
;
...
...
@@ -893,7 +893,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
//
assert
(ret == 0);
//
ASSERT
(ret == 0);
if
(
gRaftDetailLog
)
{
syncNodeLog2
(
"==state change become leader immediately=="
,
pSyncNode
);
...
...
@@ -915,10 +915,10 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
int32_t
ret
;
assert
(
pSyncNode
!=
NULL
);
ASSERT
(
pSyncNode
!=
NULL
);
ret
=
raftStoreClose
(
pSyncNode
->
pRaftStore
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
syncRespMgrDestroy
(
pSyncNode
->
pSyncRespMgr
);
voteGrantedDestroy
(
pSyncNode
->
pVotesGranted
);
...
...
@@ -980,7 +980,7 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t
ret
=
0
;
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
myRaftId
,
&
pSyncNode
->
myRaftId
,
pSyncNode
->
vgId
);
ret
=
syncNodePing
(
pSyncNode
,
&
pMsg
->
destId
,
pMsg
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
syncPingDestroy
(
pMsg
);
return
ret
;
...
...
@@ -992,7 +992,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
SRaftId
*
destId
=
&
(
pSyncNode
->
peersId
[
i
]);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
myRaftId
,
destId
,
pSyncNode
->
vgId
);
ret
=
syncNodePing
(
pSyncNode
,
destId
,
pMsg
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
syncPingDestroy
(
pMsg
);
}
return
ret
;
...
...
@@ -1004,7 +1004,7 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
SRaftId
*
destId
=
&
(
pSyncNode
->
replicasId
[
i
]);
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
myRaftId
,
destId
,
pSyncNode
->
vgId
);
ret
=
syncNodePing
(
pSyncNode
,
destId
,
pMsg
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
syncPingDestroy
(
pMsg
);
}
return
ret
;
...
...
@@ -1337,6 +1337,44 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
}
}
void
syncNodeErrorLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
int32_t
userStrLen
=
strlen
(
str
);
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
if
(
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
}
SyncIndex
logLastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
logBeginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
if
(
userStrLen
<
256
)
{
char
logBuf
[
128
+
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
sError
(
"%s"
,
logBuf
);
}
else
{
int
len
=
128
+
userStrLen
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
snprintf
(
s
,
len
,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
sError
(
"%s"
,
s
);
taosMemoryFree
(
s
);
}
}
char
*
syncNode2SimpleStr
(
const
SSyncNode
*
pSyncNode
)
{
int
len
=
256
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
...
...
@@ -1702,8 +1740,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
}
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
assert
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
));
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
));
syncNodeBecomeLeader
(
pSyncNode
,
"candidate to leader"
);
syncNodeLog2
(
"==state change syncNodeCandidate2Leader=="
,
pSyncNode
);
...
...
@@ -1715,21 +1753,21 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
}
void
syncNodeFollower2Candidate
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
);
pSyncNode
->
state
=
TAOS_SYNC_STATE_CANDIDATE
;
syncNodeLog2
(
"==state change syncNodeFollower2Candidate=="
,
pSyncNode
);
}
void
syncNodeLeader2Follower
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
syncNodeBecomeFollower
(
pSyncNode
,
"leader to follower"
);
syncNodeLog2
(
"==state change syncNodeLeader2Follower=="
,
pSyncNode
);
}
void
syncNodeCandidate2Follower
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
syncNodeBecomeFollower
(
pSyncNode
,
"candidate to follower"
);
syncNodeLog2
(
"==state change syncNodeCandidate2Follower=="
,
pSyncNode
);
...
...
@@ -1740,8 +1778,8 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
// just called by syncNodeVoteForSelf
// need assert
void
syncNodeVoteForTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
,
SRaftId
*
pRaftId
)
{
assert
(
term
==
pSyncNode
->
pRaftStore
->
currentTerm
);
assert
(
!
raftStoreHasVoted
(
pSyncNode
->
pRaftStore
));
ASSERT
(
term
==
pSyncNode
->
pRaftStore
->
currentTerm
);
ASSERT
(
!
raftStoreHasVoted
(
pSyncNode
->
pRaftStore
));
raftStoreVote
(
pSyncNode
->
pRaftStore
,
pRaftId
);
}
...
...
@@ -2062,17 +2100,17 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
static
int32_t
syncNodeEqNoop
(
SSyncNode
*
ths
)
{
int32_t
ret
=
0
;
assert
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
);
ASSERT
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
);
SyncIndex
index
=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
)
+
1
;
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncRaftEntry
*
pEntry
=
syncEntryBuildNoop
(
term
,
index
,
ths
->
vgId
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
uint32_t
entryLen
;
char
*
serialized
=
syncEntrySerialize
(
pEntry
,
&
entryLen
);
SyncClientRequest
*
pSyncMsg
=
syncClientRequestBuild
(
entryLen
);
assert
(
pSyncMsg
->
dataLen
==
entryLen
);
ASSERT
(
pSyncMsg
->
dataLen
==
entryLen
);
memcpy
(
pSyncMsg
->
data
,
serialized
,
entryLen
);
SRpcMsg
rpcMsg
=
{
0
};
...
...
@@ -2095,7 +2133,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
SyncIndex
index
=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
)
+
1
;
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncRaftEntry
*
pEntry
=
syncEntryBuildNoop
(
term
,
index
,
ths
->
vgId
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
// ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
...
...
@@ -2158,7 +2196,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
SyncIndex
index
=
ths
->
pLogStore
->
syncLogWriteIndex
(
ths
->
pLogStore
);
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncRaftEntry
*
pEntry
=
syncEntryBuild2
((
SyncClientRequest
*
)
pMsg
,
term
,
index
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
// ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
ecc43b66
此差异已折叠。
点击以展开。
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
ecc43b66
...
...
@@ -24,29 +24,29 @@ SRaftCfg *raftCfgOpen(const char *path) {
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s"
,
path
);
pCfg
->
pFile
=
taosOpenFile
(
pCfg
->
path
,
TD_FILE_READ
|
TD_FILE_WRITE
);
assert
(
pCfg
->
pFile
!=
NULL
);
ASSERT
(
pCfg
->
pFile
!=
NULL
);
taosLSeekFile
(
pCfg
->
pFile
,
0
,
SEEK_SET
);
char
buf
[
1024
]
=
{
0
};
int
len
=
taosReadFile
(
pCfg
->
pFile
,
buf
,
sizeof
(
buf
));
assert
(
len
>
0
);
ASSERT
(
len
>
0
);
int32_t
ret
=
raftCfgFromStr
(
buf
,
pCfg
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
return
pCfg
;
}
int32_t
raftCfgClose
(
SRaftCfg
*
pRaftCfg
)
{
int64_t
ret
=
taosCloseFile
(
&
(
pRaftCfg
->
pFile
));
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
taosMemoryFree
(
pRaftCfg
);
return
0
;
}
int32_t
raftCfgPersist
(
SRaftCfg
*
pRaftCfg
)
{
assert
(
pRaftCfg
!=
NULL
);
ASSERT
(
pRaftCfg
!=
NULL
);
char
*
s
=
raftCfg2Str
(
pRaftCfg
);
taosLSeekFile
(
pRaftCfg
->
pFile
,
0
,
SEEK_SET
);
...
...
@@ -61,10 +61,10 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
snprintf
(
buf
,
sizeof
(
buf
),
"%s"
,
s
);
int64_t
ret
=
taosWriteFile
(
pRaftCfg
->
pFile
,
buf
,
sizeof
(
buf
));
assert
(
ret
==
sizeof
(
buf
));
ASSERT
(
ret
==
sizeof
(
buf
));
// int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
//
assert
(ret == strlen(s) + 1);
//
ASSERT
(ret == strlen(s) + 1);
taosMemoryFree
(
s
);
taosFsyncFile
(
pRaftCfg
->
pFile
);
...
...
@@ -135,27 +135,27 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
const
cJSON
*
pJson
=
pRoot
;
cJSON
*
pReplicaNum
=
cJSON_GetObjectItem
(
pJson
,
"replicaNum"
);
assert
(
cJSON_IsNumber
(
pReplicaNum
));
ASSERT
(
cJSON_IsNumber
(
pReplicaNum
));
pSyncCfg
->
replicaNum
=
cJSON_GetNumberValue
(
pReplicaNum
);
cJSON
*
pMyIndex
=
cJSON_GetObjectItem
(
pJson
,
"myIndex"
);
assert
(
cJSON_IsNumber
(
pMyIndex
));
ASSERT
(
cJSON_IsNumber
(
pMyIndex
));
pSyncCfg
->
myIndex
=
cJSON_GetNumberValue
(
pMyIndex
);
cJSON
*
pNodeInfoArr
=
cJSON_GetObjectItem
(
pJson
,
"nodeInfo"
);
int
arraySize
=
cJSON_GetArraySize
(
pNodeInfoArr
);
assert
(
arraySize
==
pSyncCfg
->
replicaNum
);
ASSERT
(
arraySize
==
pSyncCfg
->
replicaNum
);
for
(
int
i
=
0
;
i
<
arraySize
;
++
i
)
{
cJSON
*
pNodeInfo
=
cJSON_GetArrayItem
(
pNodeInfoArr
,
i
);
assert
(
pNodeInfo
!=
NULL
);
ASSERT
(
pNodeInfo
!=
NULL
);
cJSON
*
pNodePort
=
cJSON_GetObjectItem
(
pNodeInfo
,
"nodePort"
);
assert
(
cJSON_IsNumber
(
pNodePort
));
ASSERT
(
cJSON_IsNumber
(
pNodePort
));
((
pSyncCfg
->
nodeInfo
)[
i
]).
nodePort
=
cJSON_GetNumberValue
(
pNodePort
);
cJSON
*
pNodeFqdn
=
cJSON_GetObjectItem
(
pNodeInfo
,
"nodeFqdn"
);
assert
(
cJSON_IsString
(
pNodeFqdn
));
ASSERT
(
cJSON_IsString
(
pNodeFqdn
));
snprintf
(((
pSyncCfg
->
nodeInfo
)[
i
]).
nodeFqdn
,
sizeof
(((
pSyncCfg
->
nodeInfo
)[
i
]).
nodeFqdn
),
"%s"
,
pNodeFqdn
->
valuestring
);
}
...
...
@@ -165,10 +165,10 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
int32_t
syncCfgFromStr
(
const
char
*
s
,
SSyncCfg
*
pSyncCfg
)
{
cJSON
*
pRoot
=
cJSON_Parse
(
s
);
assert
(
pRoot
!=
NULL
);
ASSERT
(
pRoot
!=
NULL
);
int32_t
ret
=
syncCfgFromJson
(
pRoot
,
pSyncCfg
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
cJSON_Delete
(
pRoot
);
return
0
;
...
...
@@ -207,10 +207,10 @@ char *raftCfg2Str(SRaftCfg *pRaftCfg) {
}
int32_t
raftCfgCreateFile
(
SSyncCfg
*
pCfg
,
SRaftCfgMeta
meta
,
const
char
*
path
)
{
assert
(
pCfg
!=
NULL
);
ASSERT
(
pCfg
!=
NULL
);
TdFilePtr
pFile
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
);
assert
(
pFile
!=
NULL
);
ASSERT
(
pFile
!=
NULL
);
SRaftCfg
raftCfg
;
raftCfg
.
cfg
=
*
pCfg
;
...
...
@@ -227,10 +227,10 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
ASSERT
(
strlen
(
s
)
+
1
<=
CONFIG_FILE_LEN
);
snprintf
(
buf
,
sizeof
(
buf
),
"%s"
,
s
);
int64_t
ret
=
taosWriteFile
(
pFile
,
buf
,
sizeof
(
buf
));
assert
(
ret
==
sizeof
(
buf
));
ASSERT
(
ret
==
sizeof
(
buf
));
// int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
//
assert
(ret == strlen(s) + 1);
//
ASSERT
(ret == strlen(s) + 1);
taosMemoryFree
(
s
);
taosCloseFile
(
&
pFile
);
...
...
@@ -255,15 +255,15 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON
*
pIndexArr
=
cJSON_GetObjectItem
(
pJson
,
"configIndexArr"
);
int
arraySize
=
cJSON_GetArraySize
(
pIndexArr
);
assert
(
arraySize
==
pRaftCfg
->
configIndexCount
);
ASSERT
(
arraySize
==
pRaftCfg
->
configIndexCount
);
memset
(
pRaftCfg
->
configIndexArr
,
0
,
sizeof
(
pRaftCfg
->
configIndexArr
));
for
(
int
i
=
0
;
i
<
arraySize
;
++
i
)
{
cJSON
*
pIndexObj
=
cJSON_GetArrayItem
(
pIndexArr
,
i
);
assert
(
pIndexObj
!=
NULL
);
ASSERT
(
pIndexObj
!=
NULL
);
cJSON
*
pIndex
=
cJSON_GetObjectItem
(
pIndexObj
,
"index"
);
assert
(
cJSON_IsString
(
pIndex
));
ASSERT
(
cJSON_IsString
(
pIndex
));
(
pRaftCfg
->
configIndexArr
)[
i
]
=
atoll
(
pIndex
->
valuestring
);
}
...
...
@@ -276,10 +276,10 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
int32_t
raftCfgFromStr
(
const
char
*
s
,
SRaftCfg
*
pRaftCfg
)
{
cJSON
*
pRoot
=
cJSON_Parse
(
s
);
assert
(
pRoot
!=
NULL
);
ASSERT
(
pRoot
!=
NULL
);
int32_t
ret
=
raftCfgFromJson
(
pRoot
,
pRaftCfg
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
cJSON_Delete
(
pRoot
);
return
0
;
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
ecc43b66
...
...
@@ -19,7 +19,7 @@
SSyncRaftEntry
*
syncEntryBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
sizeof
(
SSyncRaftEntry
)
+
dataLen
;
SSyncRaftEntry
*
pEntry
=
taosMemoryMalloc
(
bytes
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
memset
(
pEntry
,
0
,
bytes
);
pEntry
->
bytes
=
bytes
;
pEntry
->
dataLen
=
dataLen
;
...
...
@@ -29,14 +29,14 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index
SSyncRaftEntry
*
syncEntryBuild2
(
SyncClientRequest
*
pMsg
,
SyncTerm
term
,
SyncIndex
index
)
{
SSyncRaftEntry
*
pEntry
=
syncEntryBuild3
(
pMsg
,
term
,
index
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
return
pEntry
;
}
SSyncRaftEntry
*
syncEntryBuild3
(
SyncClientRequest
*
pMsg
,
SyncTerm
term
,
SyncIndex
index
)
{
SSyncRaftEntry
*
pEntry
=
syncEntryBuild
(
pMsg
->
dataLen
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
pEntry
->
msgType
=
pMsg
->
msgType
;
pEntry
->
originalRpcType
=
pMsg
->
originalRpcType
;
...
...
@@ -63,7 +63,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
memcpy
(
rpcMsg
.
pCont
,
&
head
,
sizeof
(
head
));
SSyncRaftEntry
*
pEntry
=
syncEntryBuild
(
rpcMsg
.
contLen
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
pEntry
->
msgType
=
TDMT_SYNC_CLIENT_REQUEST
;
pEntry
->
originalRpcType
=
TDMT_SYNC_NOOP
;
...
...
@@ -72,7 +72,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
pEntry
->
term
=
term
;
pEntry
->
index
=
index
;
assert
(
pEntry
->
dataLen
==
rpcMsg
.
contLen
);
ASSERT
(
pEntry
->
dataLen
==
rpcMsg
.
contLen
);
memcpy
(
pEntry
->
data
,
rpcMsg
.
pCont
,
rpcMsg
.
contLen
);
rpcFreeCont
(
rpcMsg
.
pCont
);
...
...
@@ -88,7 +88,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) {
// step 5. SSyncRaftEntry => bin, to raft log
char
*
syncEntrySerialize
(
const
SSyncRaftEntry
*
pEntry
,
uint32_t
*
len
)
{
char
*
buf
=
taosMemoryMalloc
(
pEntry
->
bytes
);
assert
(
buf
!=
NULL
);
ASSERT
(
buf
!=
NULL
);
memcpy
(
buf
,
pEntry
,
pEntry
->
bytes
);
if
(
len
!=
NULL
)
{
*
len
=
pEntry
->
bytes
;
...
...
@@ -100,9 +100,9 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
SSyncRaftEntry
*
syncEntryDeserialize
(
const
char
*
buf
,
uint32_t
len
)
{
uint32_t
bytes
=
*
((
uint32_t
*
)
buf
);
SSyncRaftEntry
*
pEntry
=
taosMemoryMalloc
(
bytes
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
memcpy
(
pEntry
,
buf
,
len
);
assert
(
len
==
pEntry
->
bytes
);
ASSERT
(
len
==
pEntry
->
bytes
);
return
pEntry
;
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
ecc43b66
...
...
@@ -245,10 +245,10 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
//-------------------------------
SSyncLogStore
*
logStoreCreate
(
SSyncNode
*
pSyncNode
)
{
SSyncLogStore
*
pLogStore
=
taosMemoryMalloc
(
sizeof
(
SSyncLogStore
));
assert
(
pLogStore
!=
NULL
);
ASSERT
(
pLogStore
!=
NULL
);
pLogStore
->
data
=
taosMemoryMalloc
(
sizeof
(
SSyncLogStoreData
));
assert
(
pLogStore
->
data
!=
NULL
);
ASSERT
(
pLogStore
->
data
!=
NULL
);
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
pData
->
pSyncNode
=
pSyncNode
;
...
...
@@ -301,7 +301,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SWal
*
pWal
=
pData
->
pWal
;
SyncIndex
lastIndex
=
logStoreLastIndex
(
pLogStore
);
assert
(
pEntry
->
index
==
lastIndex
+
1
);
ASSERT
(
pEntry
->
index
==
lastIndex
+
1
);
int
code
=
0
;
SSyncLogMeta
syncMeta
;
...
...
@@ -347,10 +347,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
linuxErrMsg
);
ASSERT
(
0
);
}
//
assert
(walReadWithHandle(pWalHandle, index) == 0);
//
ASSERT
(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry
*
pEntry
=
syncEntryBuild
(
pWalHandle
->
pHead
->
head
.
bodyLen
);
assert
(
pEntry
!=
NULL
);
ASSERT
(
pEntry
!=
NULL
);
pEntry
->
msgType
=
TDMT_SYNC_CLIENT_REQUEST
;
pEntry
->
originalRpcType
=
pWalHandle
->
pHead
->
head
.
msgType
;
...
...
@@ -358,7 +358,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
pEntry
->
isWeak
=
pWalHandle
->
pHead
->
head
.
syncMeta
.
isWeek
;
pEntry
->
term
=
pWalHandle
->
pHead
->
head
.
syncMeta
.
term
;
pEntry
->
index
=
index
;
assert
(
pEntry
->
dataLen
==
pWalHandle
->
pHead
->
head
.
bodyLen
);
ASSERT
(
pEntry
->
dataLen
==
pWalHandle
->
pHead
->
head
.
bodyLen
);
memcpy
(
pEntry
->
data
,
pWalHandle
->
pHead
->
head
.
body
,
pWalHandle
->
pHead
->
head
.
bodyLen
);
// need to hold, do not new every time!!
...
...
@@ -373,7 +373,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
int32_t
logStoreTruncate
(
SSyncLogStore
*
pLogStore
,
SyncIndex
fromIndex
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
//
assert
(walRollback(pWal, fromIndex) == 0);
//
ASSERT
(walRollback(pWal, fromIndex) == 0);
int32_t
code
=
walRollback
(
pWal
,
fromIndex
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
...
...
@@ -407,7 +407,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
int32_t
logStoreUpdateCommitIndex
(
SSyncLogStore
*
pLogStore
,
SyncIndex
index
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
//
assert
(walCommit(pWal, index) == 0);
//
ASSERT
(walCommit(pWal, index) == 0);
int32_t
code
=
walCommit
(
pWal
,
index
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
...
...
source/libs/sync/src/syncRaftStore.c
浏览文件 @
ecc43b66
...
...
@@ -39,40 +39,40 @@ SRaftStore *raftStoreOpen(const char *path) {
if
(
!
raftStoreFileExist
(
pRaftStore
->
path
))
{
ret
=
raftStoreInit
(
pRaftStore
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
}
pRaftStore
->
pFile
=
taosOpenFile
(
path
,
TD_FILE_READ
|
TD_FILE_WRITE
);
assert
(
pRaftStore
->
pFile
!=
NULL
);
ASSERT
(
pRaftStore
->
pFile
!=
NULL
);
int
len
=
taosReadFile
(
pRaftStore
->
pFile
,
storeBuf
,
RAFT_STORE_BLOCK_SIZE
);
assert
(
len
==
RAFT_STORE_BLOCK_SIZE
);
ASSERT
(
len
==
RAFT_STORE_BLOCK_SIZE
);
ret
=
raftStoreDeserialize
(
pRaftStore
,
storeBuf
,
len
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
return
pRaftStore
;
}
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
)
{
assert
(
pRaftStore
!=
NULL
);
ASSERT
(
pRaftStore
!=
NULL
);
pRaftStore
->
pFile
=
taosOpenFile
(
pRaftStore
->
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
);
assert
(
pRaftStore
->
pFile
!=
NULL
);
ASSERT
(
pRaftStore
->
pFile
!=
NULL
);
pRaftStore
->
currentTerm
=
0
;
pRaftStore
->
voteFor
.
addr
=
0
;
pRaftStore
->
voteFor
.
vgId
=
0
;
int32_t
ret
=
raftStorePersist
(
pRaftStore
);
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
taosCloseFile
(
&
pRaftStore
->
pFile
);
return
0
;
}
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
)
{
assert
(
pRaftStore
!=
NULL
);
ASSERT
(
pRaftStore
!=
NULL
);
taosCloseFile
(
&
pRaftStore
->
pFile
);
taosMemoryFree
(
pRaftStore
);
...
...
@@ -81,17 +81,17 @@ int32_t raftStoreClose(SRaftStore *pRaftStore) {
}
int32_t
raftStorePersist
(
SRaftStore
*
pRaftStore
)
{
assert
(
pRaftStore
!=
NULL
);
ASSERT
(
pRaftStore
!=
NULL
);
int32_t
ret
;
char
storeBuf
[
RAFT_STORE_BLOCK_SIZE
]
=
{
0
};
ret
=
raftStoreSerialize
(
pRaftStore
,
storeBuf
,
sizeof
(
storeBuf
));
assert
(
ret
==
0
);
ASSERT
(
ret
==
0
);
taosLSeekFile
(
pRaftStore
->
pFile
,
0
,
SEEK_SET
);
ret
=
taosWriteFile
(
pRaftStore
->
pFile
,
storeBuf
,
sizeof
(
storeBuf
));
assert
(
ret
==
RAFT_STORE_BLOCK_SIZE
);
ASSERT
(
ret
==
RAFT_STORE_BLOCK_SIZE
);
taosFsyncFile
(
pRaftStore
->
pFile
);
return
0
;
...
...
@@ -103,7 +103,7 @@ static bool raftStoreFileExist(char *path) {
}
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
assert
(
pRaftStore
!=
NULL
);
ASSERT
(
pRaftStore
!=
NULL
);
cJSON
*
pRoot
=
cJSON_CreateObject
();
...
...
@@ -125,7 +125,7 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
char
*
serialized
=
cJSON_Print
(
pRoot
);
int
len2
=
strlen
(
serialized
);
assert
(
len2
<
len
);
ASSERT
(
len2
<
len
);
memset
(
buf
,
0
,
len
);
snprintf
(
buf
,
len
,
"%s"
,
serialized
);
taosMemoryFree
(
serialized
);
...
...
@@ -135,17 +135,17 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
}
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
)
{
assert
(
pRaftStore
!=
NULL
);
ASSERT
(
pRaftStore
!=
NULL
);
assert
(
len
>
0
&&
len
<=
RAFT_STORE_BLOCK_SIZE
);
ASSERT
(
len
>
0
&&
len
<=
RAFT_STORE_BLOCK_SIZE
);
cJSON
*
pRoot
=
cJSON_Parse
(
buf
);
cJSON
*
pCurrentTerm
=
cJSON_GetObjectItem
(
pRoot
,
"current_term"
);
assert
(
cJSON_IsString
(
pCurrentTerm
));
ASSERT
(
cJSON_IsString
(
pCurrentTerm
));
sscanf
(
pCurrentTerm
->
valuestring
,
"%lu"
,
&
(
pRaftStore
->
currentTerm
));
cJSON
*
pVoteForAddr
=
cJSON_GetObjectItem
(
pRoot
,
"vote_for_addr"
);
assert
(
cJSON_IsString
(
pVoteForAddr
));
ASSERT
(
cJSON_IsString
(
pVoteForAddr
));
sscanf
(
pVoteForAddr
->
valuestring
,
"%lu"
,
&
(
pRaftStore
->
voteFor
.
addr
));
cJSON
*
pVoteForVgid
=
cJSON_GetObjectItem
(
pRoot
,
"vote_for_vgid"
);
...
...
@@ -161,7 +161,7 @@ bool raftStoreHasVoted(SRaftStore *pRaftStore) {
}
void
raftStoreVote
(
SRaftStore
*
pRaftStore
,
SRaftId
*
pRaftId
)
{
assert
(
!
syncUtilEmptyId
(
pRaftId
));
ASSERT
(
!
syncUtilEmptyId
(
pRaftId
));
pRaftStore
->
voteFor
=
*
pRaftId
;
raftStorePersist
(
pRaftStore
);
}
...
...
@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
)
{
cJSON
*
pJson
=
raftStore2Json
(
pRaftStore
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
ecc43b66
...
...
@@ -49,7 +49,7 @@
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeAppendEntriesPeers
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
syncIndexMgrLog2
(
"==syncNodeAppendEntriesPeers== pNextIndex"
,
pSyncNode
->
pNextIndex
);
syncIndexMgrLog2
(
"==syncNodeAppendEntriesPeers== pMatchIndex"
,
pSyncNode
->
pMatchIndex
);
...
...
@@ -68,7 +68,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
SyncTerm
preLogTerm
=
0
;
if
(
preLogIndex
>=
SYNC_INDEX_BEGIN
)
{
SSyncRaftEntry
*
pPreEntry
=
pSyncNode
->
pLogStore
->
getEntry
(
pSyncNode
->
pLogStore
,
preLogIndex
);
assert
(
pPreEntry
!=
NULL
);
ASSERT
(
pPreEntry
!=
NULL
);
preLogTerm
=
pPreEntry
->
term
;
syncEntryDestory
(
pPreEntry
);
...
...
@@ -81,12 +81,12 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
SSyncRaftEntry
*
pEntry
=
pSyncNode
->
pLogStore
->
getEntry
(
pSyncNode
->
pLogStore
,
nextIndex
);
if
(
pEntry
!=
NULL
)
{
pMsg
=
syncAppendEntriesBuild
(
pEntry
->
bytes
,
pSyncNode
->
vgId
);
assert
(
pMsg
!=
NULL
);
ASSERT
(
pMsg
!=
NULL
);
// add pEntry into msg
uint32_t
len
;
char
*
serialized
=
syncEntrySerialize
(
pEntry
,
&
len
);
assert
(
len
==
pEntry
->
bytes
);
ASSERT
(
len
==
pEntry
->
bytes
);
memcpy
(
pMsg
->
data
,
serialized
,
len
);
taosMemoryFree
(
serialized
);
...
...
@@ -95,10 +95,10 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
}
else
{
// maybe overflow, send empty record
pMsg
=
syncAppendEntriesBuild
(
0
,
pSyncNode
->
vgId
);
assert
(
pMsg
!=
NULL
);
ASSERT
(
pMsg
!=
NULL
);
}
assert
(
pMsg
!=
NULL
);
ASSERT
(
pMsg
!=
NULL
);
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
*
pDestId
;
pMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
...
...
@@ -157,7 +157,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
// add pEntry into msg
uint32_t
len
;
char
*
serialized
=
syncEntrySerialize
(
pEntry
,
&
len
);
assert
(
len
==
pEntry
->
bytes
);
ASSERT
(
len
==
pEntry
->
bytes
);
memcpy
(
pMsg
->
data
,
serialized
,
len
);
taosMemoryFree
(
serialized
);
...
...
source/libs/sync/src/syncRequestVote.c
浏览文件 @
ecc43b66
...
...
@@ -52,7 +52,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
syncNodeUpdateTerm
(
ths
,
pMsg
->
term
);
}
assert
(
pMsg
->
term
<=
ths
->
pRaftStore
->
currentTerm
);
ASSERT
(
pMsg
->
term
<=
ths
->
pRaftStore
->
currentTerm
);
bool
logOK
=
(
pMsg
->
lastLogTerm
>
ths
->
pLogStore
->
getLastTerm
(
ths
->
pLogStore
))
||
((
pMsg
->
lastLogTerm
==
ths
->
pLogStore
->
getLastTerm
(
ths
->
pLogStore
))
&&
...
...
source/libs/sync/src/syncRequestVoteReply.c
浏览文件 @
ecc43b66
...
...
@@ -50,7 +50,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
return
ret
;
}
//
assert
(!(pMsg->term > ths->pRaftStore->currentTerm));
//
ASSERT
(!(pMsg->term > ths->pRaftStore->currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
...
...
@@ -65,7 +65,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
return
ret
;
}
assert
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.
...
...
@@ -115,7 +115,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
return
ret
;
}
//
assert
(!(pMsg->term > ths->pRaftStore->currentTerm));
//
ASSERT
(!(pMsg->term > ths->pRaftStore->currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
ecc43b66
...
...
@@ -22,7 +22,7 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
pObj
->
pRespHash
=
taosHashInit
(
sizeof
(
uint64_t
),
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
assert
(
pObj
->
pRespHash
!=
NULL
);
ASSERT
(
pObj
->
pRespHash
!=
NULL
);
pObj
->
ttl
=
ttl
;
pObj
->
data
=
data
;
pObj
->
seqNum
=
0
;
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
ecc43b66
...
...
@@ -86,7 +86,7 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
void
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
)
{
uint32_t
ipv4
=
taosGetIpv4FromFqdn
(
pNodeInfo
->
nodeFqdn
);
assert
(
ipv4
!=
0xFFFFFFFF
);
ASSERT
(
ipv4
!=
0xFFFFFFFF
);
char
ipbuf
[
128
]
=
{
0
};
tinet_ntoa
(
ipbuf
,
ipv4
);
raftId
->
addr
=
syncUtilAddr2U64
(
ipbuf
,
pNodeInfo
->
nodePort
);
...
...
@@ -124,7 +124,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
int32_t
syncUtilRand
(
int32_t
max
)
{
return
taosRand
()
%
max
;
}
int32_t
syncUtilElectRandomMS
(
int32_t
min
,
int32_t
max
)
{
assert
(
min
>
0
&&
max
>
0
&&
max
>=
min
);
ASSERT
(
min
>
0
&&
max
>
0
&&
max
>=
min
);
return
min
+
syncUtilRand
(
max
-
min
);
}
...
...
@@ -201,7 +201,7 @@ bool syncUtilCanPrint(char c) {
char
*
syncUtilprintBin
(
char
*
ptr
,
uint32_t
len
)
{
char
*
s
=
taosMemoryMalloc
(
len
+
1
);
assert
(
s
!=
NULL
);
ASSERT
(
s
!=
NULL
);
memset
(
s
,
0
,
len
+
1
);
memcpy
(
s
,
ptr
,
len
);
...
...
@@ -216,7 +216,7 @@ char* syncUtilprintBin(char* ptr, uint32_t len) {
char
*
syncUtilprintBin2
(
char
*
ptr
,
uint32_t
len
)
{
uint32_t
len2
=
len
*
4
+
1
;
char
*
s
=
taosMemoryMalloc
(
len2
);
assert
(
s
!=
NULL
);
ASSERT
(
s
!=
NULL
);
memset
(
s
,
0
,
len2
);
char
*
p
=
s
;
...
...
source/libs/sync/src/syncVoteMgr.c
浏览文件 @
ecc43b66
...
...
@@ -24,7 +24,7 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
SVotesGranted
*
voteGrantedCreate
(
SSyncNode
*
pSyncNode
)
{
SVotesGranted
*
pVotesGranted
=
taosMemoryMalloc
(
sizeof
(
SVotesGranted
));
assert
(
pVotesGranted
!=
NULL
);
ASSERT
(
pVotesGranted
!=
NULL
);
memset
(
pVotesGranted
,
0
,
sizeof
(
SVotesGranted
));
pVotesGranted
->
replicas
=
&
(
pSyncNode
->
replicasId
);
...
...
@@ -62,9 +62,9 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
}
void
voteGrantedVote
(
SVotesGranted
*
pVotesGranted
,
SyncRequestVoteReply
*
pMsg
)
{
assert
(
pMsg
->
voteGranted
==
true
);
assert
(
pMsg
->
term
==
pVotesGranted
->
term
);
assert
(
syncUtilSameId
(
&
pVotesGranted
->
pSyncNode
->
myRaftId
,
&
pMsg
->
destId
));
ASSERT
(
pMsg
->
voteGranted
==
true
);
ASSERT
(
pMsg
->
term
==
pVotesGranted
->
term
);
ASSERT
(
syncUtilSameId
(
&
pVotesGranted
->
pSyncNode
->
myRaftId
,
&
pMsg
->
destId
));
int
j
=
-
1
;
for
(
int
i
=
0
;
i
<
pVotesGranted
->
replicaNum
;
++
i
)
{
...
...
@@ -73,14 +73,14 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
break
;
}
}
assert
(
j
!=
-
1
);
assert
(
j
>=
0
&&
j
<
pVotesGranted
->
replicaNum
);
ASSERT
(
j
!=
-
1
);
ASSERT
(
j
>=
0
&&
j
<
pVotesGranted
->
replicaNum
);
if
(
pVotesGranted
->
isGranted
[
j
]
!=
true
)
{
++
(
pVotesGranted
->
votes
);
pVotesGranted
->
isGranted
[
j
]
=
true
;
}
assert
(
pVotesGranted
->
votes
<=
pVotesGranted
->
replicaNum
);
ASSERT
(
pVotesGranted
->
votes
<=
pVotesGranted
->
replicaNum
);
}
void
voteGrantedReset
(
SVotesGranted
*
pVotesGranted
,
SyncTerm
term
)
{
...
...
@@ -127,7 +127,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char
*
voteGranted2Str
(
SVotesGranted
*
pVotesGranted
)
{
cJSON
*
pJson
=
voteGranted2Json
(
pVotesGranted
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -162,7 +162,7 @@ void voteGrantedLog2(char *s, SVotesGranted *pObj) {
// SVotesRespond -----------------------------
SVotesRespond
*
votesRespondCreate
(
SSyncNode
*
pSyncNode
)
{
SVotesRespond
*
pVotesRespond
=
taosMemoryMalloc
(
sizeof
(
SVotesRespond
));
assert
(
pVotesRespond
!=
NULL
);
ASSERT
(
pVotesRespond
!=
NULL
);
memset
(
pVotesRespond
,
0
,
sizeof
(
SVotesRespond
));
pVotesRespond
->
replicas
=
&
(
pSyncNode
->
replicasId
);
...
...
@@ -198,15 +198,15 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
}
void
votesRespondAdd
(
SVotesRespond
*
pVotesRespond
,
const
SyncRequestVoteReply
*
pMsg
)
{
assert
(
pVotesRespond
->
term
==
pMsg
->
term
);
ASSERT
(
pVotesRespond
->
term
==
pMsg
->
term
);
for
(
int
i
=
0
;
i
<
pVotesRespond
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
*
(
pVotesRespond
->
replicas
))[
i
]),
&
pMsg
->
srcId
))
{
//
assert
(pVotesRespond->isRespond[i] == false);
//
ASSERT
(pVotesRespond->isRespond[i] == false);
pVotesRespond
->
isRespond
[
i
]
=
true
;
return
;
}
}
assert
(
0
);
ASSERT
(
0
);
}
void
votesRespondReset
(
SVotesRespond
*
pVotesRespond
,
SyncTerm
term
)
{
...
...
@@ -256,7 +256,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char
*
votesRespond2Str
(
SVotesRespond
*
pVotesRespond
)
{
cJSON
*
pJson
=
votesRespond2Json
(
pVotesRespond
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录