Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
efc7b1b9
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
efc7b1b9
编写于
6月 18, 2022
作者:
L
Li Minghao
提交者:
GitHub
6月 18, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13972 from taosdata/feature/3.0_mhli
refactor(sync) refactor trace log
上级
c98965bd
b2477629
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
277 addition
and
854 deletion
+277
-854
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+1
-0
source/libs/sync/inc/syncRaftCfg.h
source/libs/sync/inc/syncRaftCfg.h
+2
-0
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+9
-7
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+8
-358
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+3
-25
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+4
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+113
-141
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+29
-0
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+8
-13
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+15
-17
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+83
-290
source/libs/sync/test/syncRaftCfgTest.cpp
source/libs/sync/test/syncRaftCfgTest.cpp
+2
-0
未找到文件。
source/libs/sync/inc/syncInt.h
浏览文件 @
efc7b1b9
...
...
@@ -195,6 +195,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
int32_t
syncNodeSendMsgByInfo
(
const
SNodeInfo
*
nodeInfo
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
cJSON
*
syncNode2Json
(
const
SSyncNode
*
pSyncNode
);
char
*
syncNode2Str
(
const
SSyncNode
*
pSyncNode
);
void
syncNodeEventLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
);
char
*
syncNode2SimpleStr
(
const
SSyncNode
*
pSyncNode
);
bool
syncNodeInConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
config
);
void
syncNodeUpdateConfig
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
newConfig
,
SyncIndex
lastConfigChangeIndex
,
bool
*
isDrop
);
...
...
source/libs/sync/inc/syncRaftCfg.h
浏览文件 @
efc7b1b9
...
...
@@ -51,6 +51,7 @@ int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON
*
syncCfg2Json
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
);
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
);
int32_t
syncCfgFromStr
(
const
char
*
s
,
SSyncCfg
*
pSyncCfg
);
...
...
@@ -72,6 +73,7 @@ void syncCfgPrint(SSyncCfg *pCfg);
void
syncCfgPrint2
(
char
*
s
,
SSyncCfg
*
pCfg
);
void
syncCfgLog
(
SSyncCfg
*
pCfg
);
void
syncCfgLog2
(
char
*
s
,
SSyncCfg
*
pCfg
);
void
syncCfgLog3
(
char
*
s
,
SSyncCfg
*
pCfg
);
void
raftCfgPrint
(
SRaftCfg
*
pCfg
);
void
raftCfgPrint2
(
char
*
s
,
SRaftCfg
*
pCfg
);
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
efc7b1b9
...
...
@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool
start
;
int32_t
seq
;
int32_t
ack
;
void
*
pReader
;
void
*
pCurrentBlock
;
void
*
pReader
;
void
*
pCurrentBlock
;
int32_t
blockLen
;
SSnapshot
snapshot
;
SSyncCfg
lastConfig
;
...
...
@@ -59,14 +59,15 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender);
void
snapshotSenderStop
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotSend
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotReSend
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2SimpleStr
(
SSyncSnapshotSender
*
pSender
,
char
*
event
);
typedef
struct
SSyncSnapshotReceiver
{
bool
start
;
int32_t
ack
;
void
*
pWriter
;
void
*
pWriter
;
SyncTerm
term
;
SyncTerm
privateTerm
;
...
...
@@ -80,8 +81,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver)
void
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SRaftId
fromId
);
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
);
void
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
,
bool
apply
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2SimpleStr
(
SSyncSnapshotReceiver
*
pReceiver
,
char
*
event
);
int32_t
syncNodeOnSnapshotSendCb
(
SSyncNode
*
ths
,
SyncSnapshotSend
*
pMsg
);
int32_t
syncNodeOnSnapshotRspCb
(
SSyncNode
*
ths
,
SyncSnapshotRsp
*
pMsg
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
efc7b1b9
...
...
@@ -329,358 +329,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
return
ret
;
}
#if 0
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
}
assert(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId;
syncNodeResetElectTimer(ths);
}
assert(pMsg->dataLen >= 0);
SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL);
localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry);
}
bool logOK =
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
// reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
sTrace(
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
}
// accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
// preIndex = -1, or has preIndex entry in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
if (hasExtraEntries && hasAppendEntries) {
// not conflict by default
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// log not match, conflict
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term != pAppendEntry->term) {
conflict = true;
}
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
cbMeta.index = pRollBackEntry->index;
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
rpcFreeCont(rpcMsg.pCont);
}
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
// free memory
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
assert(0);
}
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
// call back Wal
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0x11;
SSnapshot snapshot;
ASSERT(ths->pFsm->FpGetSnapshot != NULL);
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
bool needExecute = true;
if (cbMeta.index <= snapshot.lastApplyIndex) {
needExecute = false;
}
if (needExecute) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
// config change
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex
bool hit = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
hit = true;
break;
}
}
SReConfigCbMeta cbMeta = {0};
bool isDrop;
// I am in newConfig
if (hit) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths, "config change");
}
}
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
if (ths->pFsm->FpReConfigCb != NULL) {
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
// restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
/*
tsem_post(&ths->restoreSem);
sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
*/
}
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
}
}
}
}
return ret;
}
#endif
static
int32_t
syncNodeMakeLogSame
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
code
;
...
...
@@ -717,8 +365,10 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
ASSERT
(
code
==
0
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
delBegin
,
delEnd
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"log truncate, from %ld to %ld"
,
delBegin
,
delEnd
);
syncNodeEventLog
(
ths
,
eventLog
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
return
code
;
...
...
@@ -1066,10 +716,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
ths
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
commitBegin
,
commitEnd
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit by snapshot from index:%ld to index:%ld"
,
commitBegin
,
commitEnd
);
syncNodeEventLog
(
ths
,
eventLog
);
}
SyncIndex
beginIndex
=
ths
->
commitIndex
+
1
;
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
efc7b1b9
...
...
@@ -183,31 +183,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
pMsg
->
privateTerm
<
pSender
->
privateTerm
)
{
snapshotSenderStart
(
pSender
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pSender
->
pSyncNode
->
replicasId
[
pSender
->
replicaIndex
].
addr
,
host
,
sizeof
(
host
),
&
port
);
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotSender2Str
(
pSender
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"sender:%s"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
,
s
);
taosMemoryFree
(
s
);
}
else
{
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
}
char
*
eventLog
=
snapshotSender2SimpleStr
(
pSender
,
"snapshot sender start"
);
syncNodeEventLog
(
ths
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
SyncIndex
sentryIndex
=
pSender
->
snapshot
.
lastApplyIndex
+
1
;
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
efc7b1b9
...
...
@@ -56,9 +56,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit by snapshot from index:%ld to index:%ld"
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
// update commit index
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
efc7b1b9
...
...
@@ -189,24 +189,6 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t
ret
=
0
;
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewCfg
);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
}
/*
SRaftId newId;
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
newId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
IamInNew = true;
}
*/
}
#endif
if
(
!
IamInNew
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
terrno
=
TSDB_CODE_SYN_NOT_IN_NEW_CONFIG
;
...
...
@@ -235,27 +217,6 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewCfg
);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
}
/*
// some problem in inet_addr
SRaftId newId = EMPTY_RAFT_ID;
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
newId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
IamInNew = true;
}
*/
}
#endif
if
(
!
IamInNew
)
{
sError
(
"sync reconfig error, not in new config"
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -614,9 +575,6 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
ret
=
syncNodePropose
(
pSyncNode
,
pMsg
,
isWeak
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -625,9 +583,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
0
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"propose type:%s,%d"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
SRespStub
stub
;
...
...
@@ -870,11 +829,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// start raft
// syncNodeBecomeFollower(pSyncNode);
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync open"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
syncNodeEventLog
(
pSyncNode
,
"sync open"
);
return
pSyncNode
;
}
...
...
@@ -921,8 +876,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync close"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
int32_t
ret
;
assert
(
pSyncNode
!=
NULL
);
...
...
@@ -1306,6 +1260,27 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return
serialized
;
}
void
syncNodeEventLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
int32_t
userStrLen
=
strlen
(
str
);
if
(
userStrLen
<
256
)
{
char
logBuf
[
128
+
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
str
);
sDebug
(
"%s"
,
logBuf
);
}
else
{
int
len
=
128
+
userStrLen
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
snprintf
(
s
,
len
,
"vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
str
);
sDebug
(
"%s"
,
s
);
taosMemoryFree
(
s
);
}
}
char
*
syncNode2SimpleStr
(
const
SSyncNode
*
pSyncNode
)
{
int
len
=
256
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
...
...
@@ -1361,12 +1336,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
oldSenders
[
i
]
=
(
pSyncNode
->
senders
)[
i
];
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
i
,
oldSenders
[
i
],
oldSenders
[
i
]
->
privateTerm
);
if
(
gRaftDetailLog
)
{
;
}
char
*
eventLog
=
snapshotSender2SimpleStr
(
oldSenders
[
i
],
"snapshot sender save old"
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
// init internal
...
...
@@ -1415,12 +1388,14 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
((
pSyncNode
->
replicasId
)[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
],
oldSenders
[
j
]
->
privateTerm
);
do
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"snapshot sender reset for %lu, newIndex:%d, %s:%d, %p"
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
]);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
while
(
0
);
(
pSyncNode
->
senders
)[
i
]
=
oldSenders
[
j
];
oldSenders
[
j
]
=
NULL
;
reset
=
true
;
...
...
@@ -1428,11 +1403,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
// reset replicaIndex
int32_t
oldreplicaIndex
=
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
;
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
=
i
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"reset:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
do
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d"
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
while
(
0
);
}
}
}
...
...
@@ -1441,10 +1418,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
senders
)[
i
],
i
,
(
pSyncNode
->
senders
)[
i
]
->
privateTerm
);
char
*
eventLog
=
snapshotSender2SimpleStr
((
pSyncNode
->
senders
)[
i
],
"snapshot sender create new"
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
}
...
...
@@ -1452,9 +1429,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
oldSenders
[
i
]
!=
NULL
)
{
snapshotSenderDestroy
(
oldSenders
[
i
]);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldSenders
[
i
],
i
);
do
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"snapshot sender delete old %p replica-index:%d"
,
oldSenders
[
i
],
i
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
while
(
0
);
oldSenders
[
i
]
=
NULL
;
}
}
...
...
@@ -1462,24 +1443,6 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
bool
IamInOld
=
syncNodeInConfig
(
pSyncNode
,
&
oldConfig
);
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewConfig
);
#if 0
for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInOld = true;
break;
}
}
for (int i = 0; i < newConfig->replicaNum; ++i) {
if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
break;
}
}
#endif
*
isDrop
=
true
;
if
(
IamInOld
&&
!
IamInNew
)
{
*
isDrop
=
true
;
...
...
@@ -1524,13 +1487,6 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
}
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
// maybe clear leader cache
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
pSyncNode
->
leaderCache
=
EMPTY_RAFT_ID
;
...
...
@@ -1542,6 +1498,21 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// reset elect timer
syncNodeResetElectTimer
(
pSyncNode
);
// trace log
do
{
int32_t
debugStrLen
=
strlen
(
debugStr
);
if
(
debugStrLen
<
256
)
{
char
eventLog
[
256
+
64
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"become follower %s"
,
debugStr
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
else
{
char
*
eventLog
=
taosMemoryMalloc
(
debugStrLen
+
64
);
snprintf
(
eventLog
,
debugStrLen
,
"become follower %s"
,
debugStr
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
}
while
(
0
);
}
// TLA+ Spec
...
...
@@ -1566,13 +1537,6 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// reset restoreFinish
pSyncNode
->
restoreFinish
=
false
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_LEADER
;
...
...
@@ -1618,6 +1582,21 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// start heartbeat timer
syncNodeStartHeartbeatTimer
(
pSyncNode
);
// trace log
do
{
int32_t
debugStrLen
=
strlen
(
debugStr
);
if
(
debugStrLen
<
256
)
{
char
eventLog
[
256
+
64
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"become leader %s"
,
debugStr
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
else
{
char
*
eventLog
=
taosMemoryMalloc
(
debugStrLen
+
64
);
snprintf
(
eventLog
,
debugStrLen
,
"become leader %s"
,
debugStr
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
}
while
(
0
);
}
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -2147,30 +2126,26 @@ const char* syncStr(ESyncState state) {
static
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
)
{
SyncLeaderTransfer
*
pSyncLeaderTransfer
=
syncLeaderTransferFromRpcMsg2
(
pRpcMsg
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
);
syncNodeEventLog
(
ths
,
"begin leader transfer"
);
if
(
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
==
ths
->
myNodeInfo
.
nodePort
)
{
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
newLeaderId
.
addr
);
bool
sameId
=
syncUtilSameId
(
&
(
pSyncLeaderTransfer
->
newLeaderId
),
&
(
ths
->
myRaftId
));
bool
sameNodeInfo
=
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
==
ths
->
myNodeInfo
.
nodePort
;
bool
same
=
sameId
||
sameNodeInfo
;
if
(
same
)
{
// reset elect timer now!
int32_t
electMS
=
1
;
int32_t
ret
=
syncNodeRestartElectTimer
(
ths
,
electMS
);
ASSERT
(
ret
==
0
);
char
eventLog
[
256
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"maybe leader transfer to %s:%d %lu"
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
newLeaderId
.
addr
);
syncNodeEventLog
(
ths
,
eventLog
);
}
/*
if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
}
*/
if
(
ths
->
pFsm
->
FpLeaderTransferCb
!=
NULL
)
{
SFsmCbMeta
cbMeta
=
{
0
};
cbMeta
.
code
=
0
;
...
...
@@ -2235,12 +2210,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
// change isStandBy to normal
if
(
!
isDrop
)
{
char
tmpbuf
[
512
];
char
*
oldStr
=
syncCfg2Str
(
&
oldSyncCfg
);
char
*
newStr
=
syncCfg2Str
(
&
newSyncCfg
);
syncUtilJson2Line
(
oldStr
);
syncUtilJson2Line
(
newStr
);
char
*
oldStr
=
syncCfg2SimpleStr
(
&
oldSyncCfg
);
char
*
newStr
=
syncCfg2SimpleStr
(
&
newSyncCfg
);
snprintf
(
tmpbuf
,
sizeof
(
tmpbuf
),
"config change from %d to %d, index:%ld, %s --> %s"
,
oldSyncCfg
.
replicaNum
,
newSyncCfg
.
replicaNum
,
pEntry
->
index
,
oldStr
,
newStr
);
newSyncCfg
.
replicaNum
,
pEntry
->
index
,
oldStr
,
newStr
);
taosMemoryFree
(
oldStr
);
taosMemoryFree
(
newStr
);
...
...
@@ -2252,12 +2225,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
}
}
else
{
char
tmpbuf
[
512
];
char
*
oldStr
=
syncCfg2Str
(
&
oldSyncCfg
);
char
*
newStr
=
syncCfg2Str
(
&
newSyncCfg
);
syncUtilJson2Line
(
oldStr
);
syncUtilJson2Line
(
newStr
);
char
*
oldStr
=
syncCfg2SimpleStr
(
&
oldSyncCfg
);
char
*
newStr
=
syncCfg2SimpleStr
(
&
newSyncCfg
);
snprintf
(
tmpbuf
,
sizeof
(
tmpbuf
),
"config change2 from %d to %d, index:%ld, %s --> %s"
,
oldSyncCfg
.
replicaNum
,
newSyncCfg
.
replicaNum
,
pEntry
->
index
,
oldStr
,
newStr
);
newSyncCfg
.
replicaNum
,
pEntry
->
index
,
oldStr
,
newStr
);
taosMemoryFree
(
oldStr
);
taosMemoryFree
(
newStr
);
...
...
@@ -2295,10 +2266,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
int32_t
code
=
0
;
ESyncState
state
=
flag
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
beginIndex
,
endIndex
,
syncUtilState2String
(
state
)
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit by wal from index:%ld to index:%ld"
,
beginIndex
,
endIndex
);
syncNodeEventLog
(
ths
,
eventLog
);
// execute fsm
if
(
ths
->
pFsm
!=
NULL
)
{
...
...
@@ -2350,9 +2321,10 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths
->
pFsm
->
FpRestoreFinishCb
(
ths
->
pFsm
);
}
ths
->
restoreFinish
=
true
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
ths
->
state
),
pEntry
->
index
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"restore finish, index:%ld"
,
pEntry
->
index
);
syncNodeEventLog
(
ths
,
eventLog
);
}
}
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
efc7b1b9
...
...
@@ -101,6 +101,29 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
return
serialized
;
}
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
)
{
int32_t
len
=
512
;
char
*
s
=
taosMemoryMalloc
(
len
);
memset
(
s
,
0
,
len
);
snprintf
(
s
,
len
,
"{replica-num:%d, my-index:%d, "
,
pSyncCfg
->
replicaNum
,
pSyncCfg
->
myIndex
);
char
*
p
=
s
+
strlen
(
s
);
for
(
int
i
=
0
;
i
<
pSyncCfg
->
replicaNum
;
++
i
)
{
/*
if (p + 128 + 32 > s + len) {
break;
}
*/
char
buf
[
128
+
32
];
snprintf
(
buf
,
sizeof
(
buf
),
"%s:%d, "
,
pSyncCfg
->
nodeInfo
[
i
].
nodeFqdn
,
pSyncCfg
->
nodeInfo
[
i
].
nodePort
);
strncpy
(
p
,
buf
,
sizeof
(
buf
));
p
=
s
+
strlen
(
s
);
}
strcpy
(
p
-
2
,
"}"
);
return
s
;
}
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
)
{
memset
(
pSyncCfg
,
0
,
sizeof
(
SSyncCfg
));
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
...
...
@@ -284,6 +307,12 @@ void syncCfgLog2(char *s, SSyncCfg *pCfg) {
taosMemoryFree
(
serialized
);
}
void
syncCfgLog3
(
char
*
s
,
SSyncCfg
*
pCfg
)
{
char
*
serialized
=
syncCfg2SimpleStr
(
pCfg
);
sTrace
(
"syncCfgLog3 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
void
raftCfgPrint
(
SRaftCfg
*
pCfg
)
{
char
*
serialized
=
raftCfg2Str
(
pCfg
);
printf
(
"raftCfgPrint | len:%lu | %s
\n
"
,
strlen
(
serialized
),
serialized
);
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
efc7b1b9
...
...
@@ -163,12 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync
(
pWal
,
true
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"write index:%ld, type:%s,%d, type2:%s,%d"
,
pEntry
->
index
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
return
code
;
}
...
...
@@ -320,16 +318,13 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
linuxErrMsg
);
ASSERT
(
0
);
}
// assert(code == 0);
walFsync
(
pWal
,
true
);
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"old write index:%ld, type:%s,%d, type2:%s,%d"
,
pEntry
->
index
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
syncNodeEventLog
(
pData
->
pSyncNode
,
eventLog
);
return
code
;
}
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
efc7b1b9
...
...
@@ -46,11 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut
(
pObj
->
pRespHash
,
&
keyCode
,
sizeof
(
keyCode
),
pStub
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p
ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"resp mgr add, type:%s,%d, seq:%lu, handle:%p,
ahandle:%p"
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
return
keyCode
;
...
...
@@ -73,12 +73,11 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy
(
pStub
,
pTmp
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"resp mgr get, type:%s,%d, seq:%lu, handle:%p, ahandle:%p"
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
return
1
;
// get one object
...
...
@@ -95,12 +94,11 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy
(
pStub
,
pTmp
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"resp mgr get-and-del, type:%s,%d, seq:%lu, handle:%p, ahandle:%p"
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosHashRemove
(
pObj
->
pRespHash
,
&
index
,
sizeof
(
index
));
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
efc7b1b9
此差异已折叠。
点击以展开。
source/libs/sync/test/syncRaftCfgTest.cpp
浏览文件 @
efc7b1b9
...
...
@@ -55,6 +55,8 @@ SSyncCfg* createSyncCfg() {
void
test1
()
{
SSyncCfg
*
pCfg
=
createSyncCfg
();
syncCfgLog2
((
char
*
)
__FUNCTION__
,
pCfg
);
syncCfgLog3
((
char
*
)
__FUNCTION__
,
pCfg
);
taosMemoryFree
(
pCfg
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录