Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2d3f5274
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2d3f5274
编写于
10月 22, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: compile error in mac
上级
f5d1b3f8
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
50 addition
and
43 deletion
+50
-43
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+9
-8
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+3
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+26
-20
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+4
-4
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+4
-4
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+4
-4
未找到文件。
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
2d3f5274
...
@@ -174,7 +174,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
...
@@ -174,7 +174,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
do
{
do
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"update delete begin to %
ld"
,
delBegin
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"update delete begin to %
"
PRId64
,
delBegin
);
syncNodeEventLog
(
ths
,
logBuf
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
}
while
(
0
);
}
}
...
@@ -185,7 +185,8 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
...
@@ -185,7 +185,8 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
do
{
do
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"make log same from:%ld, delbegin:%ld, pass:%d"
,
FromIndex
,
delBegin
,
pass
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"make log same from:%l"
PRId64
", delbegin:%"
PRId64
", pass:%d"
,
FromIndex
,
delBegin
,
pass
);
syncNodeEventLog
(
ths
,
logBuf
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
}
while
(
0
);
...
@@ -371,7 +372,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -371,7 +372,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
// do nothing
// do nothing
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"log match, do nothing, index:%
ld"
,
appendIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"log match, do nothing, index:%
"
PRId64
,
appendIndex
);
syncNodeEventLog
(
ths
,
logBuf
);
syncNodeEventLog
(
ths
,
logBuf
);
}
else
{
}
else
{
...
@@ -379,7 +380,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -379,7 +380,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
appendIndex
);
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
appendIndex
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, truncate error, append-index:%
ld"
,
appendIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, truncate error, append-index:%
"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
goto
_IGNORE
;
goto
_IGNORE
;
...
@@ -389,7 +390,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -389,7 +390,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, append error, append-index:%
ld"
,
appendIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, append error, append-index:%
"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
goto
_IGNORE
;
goto
_IGNORE
;
...
@@ -404,7 +405,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -404,7 +405,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
appendIndex
);
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
appendIndex
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, truncate error, append-index:%
ld"
,
appendIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, truncate error, append-index:%
"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
goto
_IGNORE
;
goto
_IGNORE
;
...
@@ -414,7 +415,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -414,7 +415,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, append error, append-index:%
ld"
,
appendIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, append error, append-index:%
"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
goto
_IGNORE
;
goto
_IGNORE
;
...
@@ -423,7 +424,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -423,7 +424,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
else
{
}
else
{
// error
// error
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, get local entry error, append-index:%
ld"
,
appendIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, get local entry error, append-index:%
"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
goto
_IGNORE
;
goto
_IGNORE
;
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
2d3f5274
...
@@ -80,7 +80,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
...
@@ -80,7 +80,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
index
,
&
pEntry
);
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
index
,
&
pEntry
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"advance commit index error, read wal index:%
ld"
,
index
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"advance commit index error, read wal index:%
"
PRId64
,
index
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
return
;
return
;
}
}
...
@@ -136,8 +136,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
...
@@ -136,8 +136,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
int32_t
code
=
syncNodeDoCommit
(
pSyncNode
,
beginIndex
,
endIndex
,
pSyncNode
->
state
);
int32_t
code
=
syncNodeDoCommit
(
pSyncNode
,
beginIndex
,
endIndex
,
pSyncNode
->
state
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"advance commit index error, do commit begin:%
ld, end:%ld"
,
beginIndex
,
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"advance commit index error, do commit begin:%
"
PRId64
", end:%"
PRId64
,
endIndex
);
beginIndex
,
endIndex
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
return
;
return
;
}
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
2d3f5274
...
@@ -339,7 +339,7 @@ char* syncNodePeerState2Str(const SSyncNode* pSyncNode) {
...
@@ -339,7 +339,7 @@ char* syncNodePeerState2Str(const SSyncNode* pSyncNode) {
ASSERT
(
pState
!=
NULL
);
ASSERT
(
pState
!=
NULL
);
p
=
pStr
+
useLen
;
p
=
pStr
+
useLen
;
use
=
snprintf
(
p
,
leftLen
,
"%d:%
ld,%ld, "
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
);
use
=
snprintf
(
p
,
leftLen
,
"%d:%
"
PRId64
" ,%"
PRId64
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
);
useLen
+=
use
;
useLen
+=
use
;
leftLen
-=
use
;
leftLen
-=
use
;
}
}
...
@@ -374,8 +374,9 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
...
@@ -374,8 +374,9 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
if
(
isEmpty
||
(
!
isEmpty
&&
logNum
<
logRetention
))
{
if
(
isEmpty
||
(
!
isEmpty
&&
logNum
<
logRetention
))
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"new-snapshot-index:%ld, log-num:%ld, empty:%d, do not delete wal"
,
snprintf
(
logBuf
,
sizeof
(
logBuf
),
lastApplyIndex
,
logNum
,
isEmpty
);
"new-snapshot-index:%"
PRId64
", log-num:%"
PRId64
", empty:%d, do not delete wal"
,
lastApplyIndex
,
logNum
,
isEmpty
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
@@ -401,7 +402,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
...
@@ -401,7 +402,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
syncUtilU642Addr
(
pSyncNode
->
peersId
[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
syncUtilU642Addr
(
pSyncNode
->
peersId
[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"new-snapshot-index:%ld is greater than match-index:%ld of %s:%d, do not delete wal"
,
"new-snapshot-index:%"
PRId64
" is greater than match-index:%"
PRId64
" of %s:%d, do not delete wal"
,
lastApplyIndex
,
matchIndex
,
host
,
port
);
lastApplyIndex
,
matchIndex
,
host
,
port
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
}
while
(
0
);
...
@@ -415,8 +417,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
...
@@ -415,8 +417,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
if
(
lastApplyIndex
>
pSyncNode
->
minMatchIndex
)
{
if
(
lastApplyIndex
>
pSyncNode
->
minMatchIndex
)
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"new-snapshot-index:%
ld is greater than min-match-index:%ld, do not delete wal"
,
lastApplyIndex
,
"new-snapshot-index:%
"
PRId64
" is greater than min-match-index:%"
PRId64
", do not delete wal"
,
pSyncNode
->
minMatchIndex
);
lastApplyIndex
,
pSyncNode
->
minMatchIndex
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
@@ -425,7 +427,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
...
@@ -425,7 +427,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
}
else
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
}
else
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"new-snapshot-index:%
ld
candidate, do not delete wal"
,
lastApplyIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"new-snapshot-index:%
"
PRId64
"
candidate, do not delete wal"
,
lastApplyIndex
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
@@ -433,7 +435,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
...
@@ -433,7 +435,8 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
}
else
{
}
else
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"new-snapshot-index:%ld unknown state, do not delete wal"
,
lastApplyIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"new-snapshot-index:%"
PRId64
" unknown state, do not delete wal"
,
lastApplyIndex
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
@@ -462,14 +465,15 @@ _DEL_WAL:
...
@@ -462,14 +465,15 @@ _DEL_WAL:
code
=
walBeginSnapshot
(
pData
->
pWal
,
lastApplyIndex
);
code
=
walBeginSnapshot
(
pData
->
pWal
,
lastApplyIndex
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal snapshot begin, index:%
ld, last apply index:%ld"
,
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal snapshot begin, index:%
"
PRId64
", last apply index:%"
PRId64
,
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
else
{
}
else
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal snapshot begin error since:%s, index:%ld, last apply index:%ld"
,
snprintf
(
logBuf
,
sizeof
(
logBuf
),
terrstr
(
terrno
),
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
"wal snapshot begin error since:%s, index:%"
PRId64
", last apply index:%"
PRId64
,
terrstr
(
terrno
),
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
atomic_store_64
(
&
pSyncNode
->
snapshottingIndex
,
SYNC_INDEX_INVALID
);
atomic_store_64
(
&
pSyncNode
->
snapshottingIndex
,
SYNC_INDEX_INVALID
);
...
@@ -477,8 +481,9 @@ _DEL_WAL:
...
@@ -477,8 +481,9 @@ _DEL_WAL:
}
else
{
}
else
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"snapshotting for %ld, do not delete wal for new-snapshot-index:%ld"
,
snprintf
(
logBuf
,
sizeof
(
logBuf
),
snapshottingIndex
,
lastApplyIndex
);
"snapshotting for %"
PRId64
", do not delete wal for new-snapshot-index:%"
PRId64
,
snapshottingIndex
,
lastApplyIndex
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
}
}
while
(
0
);
}
while
(
0
);
...
@@ -507,7 +512,8 @@ int32_t syncEndSnapshot(int64_t rid) {
...
@@ -507,7 +512,8 @@ int32_t syncEndSnapshot(int64_t rid) {
}
else
{
}
else
{
do
{
do
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal snapshot end, index:%ld"
,
atomic_load_64
(
&
pSyncNode
->
snapshottingIndex
));
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal snapshot end, index:%"
PRId64
,
atomic_load_64
(
&
pSyncNode
->
snapshottingIndex
));
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
}
while
(
0
);
...
@@ -989,8 +995,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
...
@@ -989,8 +995,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if
(
!
pSyncNode
->
restoreFinish
&&
pSyncNode
->
vgId
!=
1
)
{
if
(
!
pSyncNode
->
restoreFinish
&&
pSyncNode
->
vgId
!=
1
)
{
ret
=
-
1
;
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_PROPOSE_NOT_READY
;
terrno
=
TSDB_CODE_SYN_PROPOSE_NOT_READY
;
sError
(
"vgId:%d, failed to sync propose since not ready, type:%s, last:%
ld, cmt:%ld"
,
pSyncNode
->
vgId
,
sError
(
"vgId:%d, failed to sync propose since not ready, type:%s, last:%
"
PRId64
", cmt:%"
PRId64
,
TMSG_INFO
(
pMsg
->
msgType
),
syncNodeGetLastIndex
(
pSyncNode
),
pSyncNode
->
commitIndex
);
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
),
syncNodeGetLastIndex
(
pSyncNode
),
pSyncNode
->
commitIndex
);
goto
_END
;
goto
_END
;
}
}
...
@@ -3144,7 +3150,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
...
@@ -3144,7 +3150,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
if
(
pEntry
->
index
<
syncNodeGetLastIndex
(
ths
))
{
if
(
pEntry
->
index
<
syncNodeGetLastIndex
(
ths
))
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"little index:%
ld
, can not do leader transfer"
,
pEntry
->
index
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"little index:%
"
PRId64
,
can
not
do
leader
transfer
", pEntry->index);
syncNodeEventLog(ths, logBuf);
syncNodeEventLog(ths, logBuf);
return 0;
return 0;
}
}
...
@@ -3160,7 +3166,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
...
@@ -3160,7 +3166,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
do {
do {
char logBuf[128];
char logBuf[128];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"do leader transfer, index:%
ld"
,
pEntry
->
index
);
snprintf(logBuf, sizeof(logBuf), "
do
leader
transfer
,
index
:%
" PRId64
, pEntry->index);
syncNodeEventLog(ths, logBuf);
syncNodeEventLog(ths, logBuf);
} while (0);
} while (0);
...
@@ -3416,8 +3422,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
...
@@ -3416,8 +3422,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
int64_t restoreDelay = taosGetTimestampMs() - ths->leaderTime;
char eventLog[128];
char eventLog[128];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"restore finish, index:%
ld, elapsed:%ld ms, "
,
pEntry
->
index
,
snprintf(eventLog, sizeof(eventLog), "
restore
finish
,
index
:%
" PRId64 "
,
elapsed
:%
" PRId64 "
ms
,
"
,
restoreDelay
);
pEntry->index,
restoreDelay);
syncNodeEventLog(ths, eventLog);
syncNodeEventLog(ths, eventLog);
}
}
}
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
2d3f5274
...
@@ -1400,28 +1400,28 @@ char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) {
...
@@ -1400,28 +1400,28 @@ char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) {
// for debug ----------------------
// for debug ----------------------
void
syncRequestVoteReplyPrint
(
const
SyncRequestVoteReply
*
pMsg
)
{
void
syncRequestVoteReplyPrint
(
const
SyncRequestVoteReply
*
pMsg
)
{
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
printf
(
"syncRequestVoteReplyPrint | len:%
ld
| %s
\n
"
,
strlen
(
serialized
),
serialized
);
printf
(
"syncRequestVoteReplyPrint | len:%
"
PRId64
"
| %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
void
syncRequestVoteReplyPrint2
(
char
*
s
,
const
SyncRequestVoteReply
*
pMsg
)
{
void
syncRequestVoteReplyPrint2
(
char
*
s
,
const
SyncRequestVoteReply
*
pMsg
)
{
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
printf
(
"syncRequestVoteReplyPrint2 | len:%
ld
| %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
printf
(
"syncRequestVoteReplyPrint2 | len:%
"
PRId64
"
| %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
void
syncRequestVoteReplyLog
(
const
SyncRequestVoteReply
*
pMsg
)
{
void
syncRequestVoteReplyLog
(
const
SyncRequestVoteReply
*
pMsg
)
{
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
sTrace
(
"syncRequestVoteReplyLog | len:%
ld
| %s"
,
strlen
(
serialized
),
serialized
);
sTrace
(
"syncRequestVoteReplyLog | len:%
"
PRId64
"
| %s"
,
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
void
syncRequestVoteReplyLog2
(
char
*
s
,
const
SyncRequestVoteReply
*
pMsg
)
{
void
syncRequestVoteReplyLog2
(
char
*
s
,
const
SyncRequestVoteReply
*
pMsg
)
{
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
char
*
serialized
=
syncRequestVoteReply2Str
(
pMsg
);
sTrace
(
"syncRequestVoteReplyLog2 | len:%
ld
| %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
sTrace
(
"syncRequestVoteReplyLog2 | len:%
"
PRId64
"
| %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
taosMemoryFree
(
serialized
);
}
}
}
}
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
2d3f5274
...
@@ -57,8 +57,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
...
@@ -57,8 +57,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
SyncIndex
logEndIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
logEndIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
if
(
nextIndex
<
logStartIndex
||
nextIndex
-
1
>
logEndIndex
)
{
if
(
nextIndex
<
logStartIndex
||
nextIndex
-
1
>
logEndIndex
)
{
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"start snapshot for next-index:%
ld, start:%ld, end:%ld"
,
nextIndex
,
logStartIndex
,
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"start snapshot for next-index:%
"
PRId64
", start:%"
PRId64
", end:%"
PRId64
,
logEndIndex
);
nextIndex
,
logStartIndex
,
logEndIndex
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
// start snapshot
// start snapshot
...
@@ -105,7 +105,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
...
@@ -105,7 +105,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
syncUtilU642Addr
(
pDestId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
syncUtilU642Addr
(
pDestId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
char
logBuf
[
128
];
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"replicate to %s:%d error, next-index:%
ld"
,
host
,
port
,
nextIndex
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"replicate to %s:%d error, next-index:%
"
PRId64
,
host
,
port
,
nextIndex
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
}
while
(
0
);
...
@@ -184,7 +184,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
...
@@ -184,7 +184,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
int16_t
port
;
int16_t
port
;
syncUtilU642Addr
(
destRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
syncUtilU642Addr
(
destRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"do not repcate to %s:%d for index:%
ld"
,
host
,
port
,
pMsg
->
prevLogIndex
+
1
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"do not repcate to %s:%d for index:%
"
PRId64
,
host
,
port
,
pMsg
->
prevLogIndex
+
1
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
}
...
...
source/libs/sync/src/syncTimeout.c
浏览文件 @
2d3f5274
...
@@ -81,7 +81,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
...
@@ -81,7 +81,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
}
else
{
}
else
{
do
{
do
{
char
logBuf
[
256
];
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal snapshot end, index:%
ld"
,
atomic_load_64
(
&
ths
->
snapshottingIndex
));
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal snapshot end, index:%
"
PRId64
,
atomic_load_64
(
&
ths
->
snapshottingIndex
));
syncNodeEventLog
(
ths
,
logBuf
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
}
while
(
0
);
...
@@ -115,7 +115,7 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
...
@@ -115,7 +115,7 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
}
else
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_ELECTION
)
{
}
else
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_ELECTION
)
{
if
(
atomic_load_64
(
&
ths
->
electTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
if
(
atomic_load_64
(
&
ths
->
electTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
++
(
ths
->
electTimerCounter
);
++
(
ths
->
electTimerCounter
);
sTrace
(
"vgId:%d, sync timer, type:election count:%lu, lc-user:%
ld"
,
ths
->
vgId
,
ths
->
electTimerCounter
,
sTrace
(
"vgId:%d, sync timer, type:election count:%lu, lc-user:%
"
PRId64
,
ths
->
vgId
,
ths
->
electTimerCounter
,
ths
->
electTimerLogicClockUser
);
ths
->
electTimerLogicClockUser
);
syncNodeElect
(
ths
);
syncNodeElect
(
ths
);
...
@@ -124,8 +124,8 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
...
@@ -124,8 +124,8 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
}
else
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_HEARTBEAT
)
{
}
else
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_HEARTBEAT
)
{
if
(
atomic_load_64
(
&
ths
->
heartbeatTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
if
(
atomic_load_64
(
&
ths
->
heartbeatTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
++
(
ths
->
heartbeatTimerCounter
);
++
(
ths
->
heartbeatTimerCounter
);
sTrace
(
"vgId:%d, sync timer, type:replicate count:%
lu, lc-user:%ld"
,
ths
->
vgId
,
ths
->
heartbeatTimerCounter
,
sTrace
(
"vgId:%d, sync timer, type:replicate count:%
"
PRIu64
", lc-user:%"
PRId64
,
ths
->
vgId
,
ths
->
heartbeatTimerLogicClockUser
);
ths
->
heartbeatTimer
Counter
,
ths
->
heartbeatTimer
LogicClockUser
);
// syncNodeReplicate(ths, true);
// syncNodeReplicate(ths, true);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录