Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1ee54684
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1ee54684
编写于
7月 08, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(sync): append entries batch
上级
1c3c2703
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
133 addition
and
288 deletion
+133
-288
include/libs/sync/sync.h
include/libs/sync/sync.h
+0
-3
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+88
-267
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+4
-2
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+16
-0
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+5
-10
source/libs/sync/test/syncLogStoreTest.cpp
source/libs/sync/test/syncLogStoreTest.cpp
+1
-0
tests/script/tsim/sync/vnodesnapshot.sim
tests/script/tsim/sync/vnodesnapshot.sim
+19
-6
未找到文件。
include/libs/sync/sync.h
浏览文件 @
1ee54684
...
...
@@ -163,9 +163,6 @@ typedef struct SSyncLogStore {
// return commit index of log
SyncIndex
(
*
getCommitIndex
)(
struct
SSyncLogStore
*
pLogStore
);
// refactor, log[0 .. n] ==> log[m .. n]
// int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
SyncIndex
(
*
syncLogBeginIndex
)(
struct
SSyncLogStore
*
pLogStore
);
SyncIndex
(
*
syncLogEndIndex
)(
struct
SSyncLogStore
*
pLogStore
);
bool
(
*
syncLogIsEmpty
)(
struct
SSyncLogStore
*
pLogStore
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
1ee54684
...
...
@@ -162,6 +162,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
...
...
@@ -334,270 +345,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply
->
matchIndex
=
pMsg
->
prevLogIndex
;
}
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
syncAppendEntriesReplyDestroy
(
pReply
);
// maybe update commit index from leader
if
(
pMsg
->
commitIndex
>
ths
->
commitIndex
)
{
// has commit entry in local
if
(
pMsg
->
commitIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
{
SyncIndex
beginIndex
=
ths
->
commitIndex
+
1
;
SyncIndex
endIndex
=
pMsg
->
commitIndex
;
// update commit index
ths
->
commitIndex
=
pMsg
->
commitIndex
;
// call back Wal
ths
->
pLogStore
->
updateCommitIndex
(
ths
->
pLogStore
,
ths
->
commitIndex
);
int32_t
code
=
syncNodeCommit
(
ths
,
beginIndex
,
endIndex
,
ths
->
state
);
ASSERT
(
code
==
0
);
}
}
}
return
ret
;
}
#if 0
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId;
syncNodeResetElectTimer(ths);
}
ASSERT(pMsg->dataLen >= 0);
SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
if (pEntry == NULL) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "getEntry error, index:%ld, since %s", pMsg->prevLogIndex, terrstr());
syncNodeErrorLog(ths, logBuf);
return -1;
}
localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry);
}
bool logOK =
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
// reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
sTrace(
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
}
// accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
// preIndex = -1, or has preIndex entry in local log
ASSERT(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
if (hasExtraEntries && hasAppendEntries) {
// not conflict by default
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
if (pExtraEntry == NULL) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "getEntry error2, index:%ld, since %s", extraIndex, terrstr());
syncNodeErrorLog(ths, logBuf);
return -1;
}
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
if (pAppendEntry == NULL) {
syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry error");
return -1;
}
// log not match, conflict
ASSERT(extraIndex == pAppendEntry->index);
if (pExtraEntry->term != pAppendEntry->term) {
conflict = true;
}
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
if (pRollBackEntry == NULL) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "getEntry error3, index:%ld, since %s", index, terrstr());
syncNodeErrorLog(ths, logBuf);
return -1;
}
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta = {0};
cbMeta.index = pRollBackEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
rpcFreeCont(rpcMsg.pCont);
}
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
if (pAppendEntry == NULL) {
syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry2 error");
return -1;
}
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
// free memory
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
syncNodeLog3("", ths);
ASSERT(0);
}
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
...
...
@@ -626,8 +383,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
return
ret
;
}
#endif
static
int32_t
syncNodeMakeLogSame
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
code
;
...
...
@@ -897,6 +652,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply
->
success
=
true
;
pReply
->
matchIndex
=
matchIndex
;
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
// send response
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
...
...
@@ -945,6 +711,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
// send response
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
...
...
@@ -977,7 +754,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries, match, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}"
,
"recv sync-append-entries
-batch
, match, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
,
pMsg
->
dataCount
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
...
...
@@ -1018,6 +795,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply
->
success
=
true
;
pReply
->
matchIndex
=
hasAppendEntries
?
pMsg
->
prevLogIndex
+
pMsg
->
dataCount
:
pMsg
->
prevLogIndex
;
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
// send response
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
...
...
@@ -1227,6 +1015,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply
->
success
=
true
;
pReply
->
matchIndex
=
matchIndex
;
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
// send response
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
...
...
@@ -1272,6 +1071,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
// send response
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
...
...
@@ -1337,6 +1147,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply
->
success
=
true
;
pReply
->
matchIndex
=
hasAppendEntries
?
pMsg
->
prevLogIndex
+
1
:
pMsg
->
prevLogIndex
;
// msg event log
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pReply
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
"match-index:%ld}"
,
ths
->
vgId
,
host
,
port
,
pReply
->
term
,
pReply
->
privateTerm
,
pReply
->
success
,
pReply
->
matchIndex
);
}
while
(
0
);
// send response
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
1ee54684
...
...
@@ -1311,8 +1311,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
pMsg
->
info
.
noResp
=
1
;
pSyncNode
->
FpSendMsg
(
&
epSet
,
pMsg
);
}
else
{
sTrace
(
"syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"
);
sError
(
"vgId:%d, sync send msg by id error, fp-send-msg is null"
,
pSyncNode
->
vgId
);
return
-
1
;
}
return
0
;
}
...
...
@@ -1326,7 +1328,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
pMsg
->
info
.
noResp
=
1
;
pSyncNode
->
FpSendMsg
(
&
epSet
,
pMsg
);
}
else
{
s
Trace
(
"syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"
);
s
Error
(
"vgId:%d, sync send msg by info error, fp-send-msg is null"
,
pSyncNode
->
vgId
);
}
return
0
;
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
1ee54684
...
...
@@ -326,6 +326,14 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
ASSERT
(
0
);
}
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal truncate, from-index:%ld"
,
fromIndex
);
syncNodeEventLog
(
pData
->
pSyncNode
,
logBuf
);
}
while
(
0
);
return
code
;
}
...
...
@@ -463,6 +471,14 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
ASSERT
(
0
);
}
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal truncate, from-index:%ld"
,
fromIndex
);
syncNodeEventLog
(
pData
->
pSyncNode
,
logBuf
);
}
while
(
0
);
return
0
;
}
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
1ee54684
...
...
@@ -151,14 +151,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
getEntryIndex
,
&
pEntry
);
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"get index:%d, code:%d, %s"
,
getEntryIndex
,
code
,
tstrerror
(
terrno
));
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
entryPArr
[
i
]
=
pEntry
;
...
...
@@ -172,8 +164,11 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"build batch:%d"
,
getCount
);
char
logBuf
[
128
];
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pDestId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"build batch:%d for %s:%d"
,
getCount
,
host
,
port
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
...
...
source/libs/sync/test/syncLogStoreTest.cpp
浏览文件 @
1ee54684
...
...
@@ -85,6 +85,7 @@ void logStoreTest() {
}
int
main
(
int
argc
,
char
**
argv
)
{
gRaftDetailLog
=
true
;
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
...
...
tests/script/tsim/sync/vnodesnapshot.sim
浏览文件 @
1ee54684
...
...
@@ -142,20 +142,33 @@ sql create table ct1 using stb tags(1000)
system sh/exec.sh -n dnode4 -s stop -x SIGINT
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
$N = 100
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
#sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
#sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
#sql flush database db;
#system sh/exec.sh -n dnode4 -s start
sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3)
#sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3)
sleep 5000
#
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#
system sh/exec.sh -n dnode2 -s stop -x SIGINT
#
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录