Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3b4ccdd4
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
3b4ccdd4
编写于
6月 23, 2022
作者:
L
Li Minghao
提交者:
GitHub
6月 23, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14145 from taosdata/feature/3.0_mhli
fix(sync): update nextindex, matchindex when backto nolog
上级
8ca6b250
81eae2a9
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
159 addition
and
54 deletion
+159
-54
include/libs/sync/sync.h
include/libs/sync/sync.h
+3
-2
include/libs/sync/syncTools.h
include/libs/sync/syncTools.h
+9
-12
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+5
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+80
-8
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+43
-30
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+9
-0
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+10
-2
未找到文件。
include/libs/sync/sync.h
浏览文件 @
3b4ccdd4
...
@@ -26,8 +26,9 @@ extern "C" {
...
@@ -26,8 +26,9 @@ extern "C" {
extern
bool
gRaftDetailLog
;
extern
bool
gRaftDetailLog
;
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN
0
#define SYNC_INDEX_INVALID -1
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
typedef
uint64_t
SyncNodeId
;
typedef
uint64_t
SyncNodeId
;
typedef
int32_t
SyncGroupId
;
typedef
int32_t
SyncGroupId
;
...
@@ -199,7 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid);
...
@@ -199,7 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm
syncGetMyTerm
(
int64_t
rid
);
SyncTerm
syncGetMyTerm
(
int64_t
rid
);
SyncGroupId
syncGetVgId
(
int64_t
rid
);
SyncGroupId
syncGetVgId
(
int64_t
rid
);
void
syncGetEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
void
syncGetEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
int32_t
syncPropose
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
bool
syncEnvIsStart
();
bool
syncEnvIsStart
();
const
char
*
syncStr
(
ESyncState
state
);
const
char
*
syncStr
(
ESyncState
state
);
bool
syncIsRestoreFinish
(
int64_t
rid
);
bool
syncIsRestoreFinish
(
int64_t
rid
);
...
...
include/libs/sync/syncTools.h
浏览文件 @
3b4ccdd4
...
@@ -43,7 +43,7 @@ void setElectTimerMS(int64_t rid, int32_t electTimerMS);
...
@@ -43,7 +43,7 @@ void setElectTimerMS(int64_t rid, int32_t electTimerMS);
void
setHeartbeatTimerMS
(
int64_t
rid
,
int32_t
hbTimerMS
);
void
setHeartbeatTimerMS
(
int64_t
rid
,
int32_t
hbTimerMS
);
// for compatibility, the same as syncPropose
// for compatibility, the same as syncPropose
int32_t
syncForwardToPeer
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncForwardToPeer
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
// utils
// utils
const
char
*
syncUtilState2String
(
ESyncState
state
);
const
char
*
syncUtilState2String
(
ESyncState
state
);
...
@@ -468,7 +468,7 @@ typedef struct SyncLeaderTransfer {
...
@@ -468,7 +468,7 @@ typedef struct SyncLeaderTransfer {
SRaftId destId;
SRaftId destId;
*/
*/
SNodeInfo
newNodeInfo
;
SNodeInfo
newNodeInfo
;
SRaftId
newLeaderId
;
SRaftId
newLeaderId
;
}
SyncLeaderTransfer
;
}
SyncLeaderTransfer
;
SyncLeaderTransfer
*
syncLeaderTransferBuild
(
int32_t
vgId
);
SyncLeaderTransfer
*
syncLeaderTransferBuild
(
int32_t
vgId
);
...
@@ -489,17 +489,16 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
...
@@ -489,17 +489,16 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
void
syncLeaderTransferLog
(
const
SyncLeaderTransfer
*
pMsg
);
void
syncLeaderTransferLog
(
const
SyncLeaderTransfer
*
pMsg
);
void
syncLeaderTransferLog2
(
char
*
s
,
const
SyncLeaderTransfer
*
pMsg
);
void
syncLeaderTransferLog2
(
char
*
s
,
const
SyncLeaderTransfer
*
pMsg
);
// ---------------------------------------------
// ---------------------------------------------
typedef
struct
SyncReconfigFinish
{
typedef
struct
SyncReconfigFinish
{
uint32_t
bytes
;
uint32_t
bytes
;
int32_t
vgId
;
int32_t
vgId
;
uint32_t
msgType
;
uint32_t
msgType
;
SSyncCfg
oldCfg
;
SSyncCfg
oldCfg
;
SSyncCfg
newCfg
;
SSyncCfg
newCfg
;
SyncIndex
newCfgIndex
;
SyncIndex
newCfgIndex
;
SyncTerm
newCfgTerm
;
SyncTerm
newCfgTerm
;
uint64_t
newCfgSeqNum
;
uint64_t
newCfgSeqNum
;
}
SyncReconfigFinish
;
}
SyncReconfigFinish
;
...
@@ -521,8 +520,6 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg);
...
@@ -521,8 +520,6 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg);
void
syncReconfigFinishLog
(
const
SyncReconfigFinish
*
pMsg
);
void
syncReconfigFinishLog
(
const
SyncReconfigFinish
*
pMsg
);
void
syncReconfigFinishLog2
(
char
*
s
,
const
SyncReconfigFinish
*
pMsg
);
void
syncReconfigFinishLog2
(
char
*
s
,
const
SyncReconfigFinish
*
pMsg
);
// on message ----------------------
// on message ----------------------
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
3b4ccdd4
...
@@ -436,6 +436,11 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
...
@@ -436,6 +436,11 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
}
}
SyncTerm
myPreLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
pMsg
->
prevLogIndex
+
1
);
SyncTerm
myPreLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
pMsg
->
prevLogIndex
+
1
);
if
(
myPreLogTerm
==
SYNC_TERM_INVALID
)
{
sError
(
"vgId:%d sync get pre term error, preindex:%ld"
,
pSyncNode
->
vgId
,
pMsg
->
prevLogIndex
);
return
false
;
}
if
(
pMsg
->
prevLogIndex
<=
myLastIndex
&&
pMsg
->
prevLogTerm
==
myPreLogTerm
)
{
if
(
pMsg
->
prevLogIndex
<=
myLastIndex
&&
pMsg
->
prevLogTerm
==
myPreLogTerm
)
{
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
sTrace
(
sTrace
(
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
3b4ccdd4
...
@@ -345,7 +345,7 @@ bool syncCanLeaderTransfer(int64_t rid) {
...
@@ -345,7 +345,7 @@ bool syncCanLeaderTransfer(int64_t rid) {
return
matchOK
;
return
matchOK
;
}
}
int32_t
syncForwardToPeer
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
syncForwardToPeer
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
syncPropose
(
rid
,
pMsg
,
isWeak
);
int32_t
ret
=
syncPropose
(
rid
,
pMsg
,
isWeak
);
return
ret
;
return
ret
;
}
}
...
@@ -584,7 +584,7 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
...
@@ -584,7 +584,7 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
}
}
int32_t
syncPropose
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
SSyncNode
*
pSyncNode
=
taosAcquireRef
(
tsNodeRefId
,
rid
);
SSyncNode
*
pSyncNode
=
taosAcquireRef
(
tsNodeRefId
,
rid
);
...
@@ -1309,40 +1309,44 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
...
@@ -1309,40 +1309,44 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
SyncIndex
logLastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
logLastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
logBeginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
logBeginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
char
*
pCfgStr
=
syncCfg2SimpleStr
(
&
(
pSyncNode
->
pRaftCfg
->
cfg
));
if
(
userStrLen
<
256
)
{
if
(
userStrLen
<
256
)
{
char
logBuf
[
128
+
256
];
char
logBuf
[
256
+
256
];
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pRaftCfg
!=
NULL
&&
pSyncNode
->
pRaftStore
!=
NULL
)
{
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pRaftCfg
!=
NULL
&&
pSyncNode
->
pRaftStore
!=
NULL
)
{
snprintf
(
logBuf
,
sizeof
(
logBuf
),
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d"
,
"lconfig:%ld, changing:%d
, %s
"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
pSyncNode
->
changing
,
pCfgStr
);
}
else
{
}
else
{
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"%s"
,
str
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"%s"
,
str
);
}
}
sDebug
(
"%s"
,
logBuf
);
sDebug
(
"%s"
,
logBuf
);
}
else
{
}
else
{
int
len
=
128
+
userStrLen
;
int
len
=
256
+
userStrLen
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pRaftCfg
!=
NULL
&&
pSyncNode
->
pRaftStore
!=
NULL
)
{
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pRaftCfg
!=
NULL
&&
pSyncNode
->
pRaftStore
!=
NULL
)
{
snprintf
(
s
,
len
,
snprintf
(
s
,
len
,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d"
,
"lconfig:%ld, changing:%d
, %s
"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
pSyncNode
->
changing
,
pCfgStr
);
}
else
{
}
else
{
snprintf
(
s
,
len
,
"%s"
,
str
);
snprintf
(
s
,
len
,
"%s"
,
str
);
}
}
sDebug
(
"%s"
,
s
);
sDebug
(
"%s"
,
s
);
taosMemoryFree
(
s
);
taosMemoryFree
(
s
);
}
}
taosMemoryFree
(
pCfgStr
);
}
}
void
syncNodeErrorLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
void
syncNodeErrorLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
...
@@ -1455,6 +1459,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1455,6 +1459,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
isAdd
=
false
;
isAdd
=
false
;
}
}
// log begin config change
do
{
char
eventLog
[
256
];
char
*
pOldCfgStr
=
syncCfg2SimpleStr
(
&
oldConfig
);
char
*
pNewCfgStr
=
syncCfg2SimpleStr
(
pNewConfig
);
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"begin do config change, from %s to %s"
,
pOldCfgStr
,
pNewCfgStr
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
pOldCfgStr
);
taosMemoryFree
(
pNewCfgStr
);
}
while
(
0
);
if
(
IamInNew
)
{
if
(
IamInNew
)
{
pSyncNode
->
pRaftCfg
->
isStandBy
=
0
;
// change isStandBy to normal
pSyncNode
->
pRaftCfg
->
isStandBy
=
0
;
// change isStandBy to normal
}
}
...
@@ -1613,6 +1628,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1613,6 +1628,17 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
}
_END:
_END:
// log end config change
do
{
char
eventLog
[
256
];
char
*
pOldCfgStr
=
syncCfg2SimpleStr
(
&
oldConfig
);
char
*
pNewCfgStr
=
syncCfg2SimpleStr
(
pNewConfig
);
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"end do config change, from %s to %s"
,
pOldCfgStr
,
pNewCfgStr
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
pOldCfgStr
);
taosMemoryFree
(
pNewCfgStr
);
}
while
(
0
);
return
;
return
;
}
}
...
@@ -1888,6 +1914,16 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
...
@@ -1888,6 +1914,16 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
return
syncStartIndex
;
return
syncStartIndex
;
}
}
SyncIndex
syncNodeGetPreIndex
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
)
{
SyncIndex
preIndex
=
index
-
1
;
if
(
preIndex
<
SYNC_INDEX_INVALID
)
{
preIndex
=
SYNC_INDEX_INVALID
;
}
return
preIndex
;
}
/*
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
ASSERT(index >= SYNC_INDEX_BEGIN);
...
@@ -1900,7 +1936,42 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
...
@@ -1900,7 +1936,42 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex preIndex = index - 1;
SyncIndex preIndex = index - 1;
return preIndex;
return preIndex;
}
}
*/
SyncTerm
syncNodeGetPreTerm
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
)
{
if
(
index
<
SYNC_INDEX_BEGIN
)
{
return
SYNC_TERM_INVALID
;
}
if
(
index
==
SYNC_INDEX_BEGIN
)
{
return
0
;
}
SyncTerm
preTerm
=
0
;
SyncIndex
preIndex
=
index
-
1
;
SSyncRaftEntry
*
pPreEntry
=
NULL
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
preIndex
,
&
pPreEntry
);
if
(
code
==
0
)
{
ASSERT
(
pPreEntry
!=
NULL
);
preTerm
=
pPreEntry
->
term
;
taosMemoryFree
(
pPreEntry
);
return
preTerm
;
}
else
{
if
(
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
,
.
lastConfigIndex
=
-
1
};
if
(
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
==
preIndex
)
{
return
snapshot
.
lastApplyTerm
;
}
}
}
}
return
SYNC_TERM_INVALID
;
}
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
ASSERT(index >= SYNC_INDEX_BEGIN);
...
@@ -1938,6 +2009,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
...
@@ -1938,6 +2009,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(0);
ASSERT(0);
return -1;
return -1;
}
}
#endif
#if 0
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
3b4ccdd4
...
@@ -144,7 +144,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
...
@@ -144,7 +144,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SyncIndex
writeIndex
=
raftLogWriteIndex
(
pLogStore
);
SyncIndex
writeIndex
=
raftLogWriteIndex
(
pLogStore
);
if
(
pEntry
->
index
!=
writeIndex
)
{
if
(
pEntry
->
index
!=
writeIndex
)
{
sError
(
"raftLogAppendEntry error, pEntry->index:%ld update to writeIndex:%ld"
,
pEntry
->
index
,
writeIndex
);
sError
(
"vgId:%d wal write index error, entry-index:%ld update to %ld"
,
pData
->
pSyncNode
->
vgId
,
pEntry
->
index
,
writeIndex
);
pEntry
->
index
=
writeIndex
;
pEntry
->
index
=
writeIndex
;
}
}
...
@@ -157,10 +158,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
...
@@ -157,10 +158,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
linux
Err
=
errno
;
int32_t
sys
Err
=
errno
;
const
char
*
linuxErrMsg
=
strerror
(
errno
);
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"
raftLogAppendEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s"
,
err
,
err
,
errStr
,
linuxErr
,
sError
(
"
vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
linuxErrMsg
);
pEntry
->
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
ASSERT
(
0
);
}
}
...
@@ -237,12 +238,15 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
...
@@ -237,12 +238,15 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
linux
Err
=
errno
;
int32_t
sys
Err
=
errno
;
const
char
*
linuxErrMsg
=
strerror
(
errno
);
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"
raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s"
,
err
,
err
,
errStr
,
linuxErr
,
sError
(
"
vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
index
,
linuxErrMsg
);
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
int32_t
saveErr
=
terrno
;
walCloseReadHandle
(
pWalHandle
);
walCloseReadHandle
(
pWalHandle
);
terrno
=
saveErr
;
return
code
;
return
code
;
}
}
...
@@ -257,8 +261,9 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
...
@@ -257,8 +261,9 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
ASSERT
((
*
ppEntry
)
->
dataLen
==
pWalHandle
->
pHead
->
head
.
bodyLen
);
ASSERT
((
*
ppEntry
)
->
dataLen
==
pWalHandle
->
pHead
->
head
.
bodyLen
);
memcpy
((
*
ppEntry
)
->
data
,
pWalHandle
->
pHead
->
head
.
body
,
pWalHandle
->
pHead
->
head
.
bodyLen
);
memcpy
((
*
ppEntry
)
->
data
,
pWalHandle
->
pHead
->
head
.
body
,
pWalHandle
->
pHead
->
head
.
bodyLen
);
// need to hold, do not new every time!!
int32_t
saveErr
=
terrno
;
walCloseReadHandle
(
pWalHandle
);
walCloseReadHandle
(
pWalHandle
);
terrno
=
saveErr
;
return
code
;
return
code
;
}
}
...
@@ -270,10 +275,11 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
...
@@ -270,10 +275,11 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
linuxErr
=
errno
;
int32_t
sysErr
=
errno
;
const
char
*
linuxErrMsg
=
strerror
(
errno
);
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"raftLogTruncate error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s"
,
err
,
err
,
errStr
,
linuxErr
,
sError
(
"vgId:%d wal truncate error, from-index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
linuxErrMsg
);
pData
->
pSyncNode
->
vgId
,
fromIndex
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
ASSERT
(
0
);
}
}
return
code
;
return
code
;
...
@@ -360,10 +366,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
...
@@ -360,10 +366,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
linuxErr
=
errno
;
int32_t
sysErr
=
errno
;
const
char
*
linuxErrMsg
=
strerror
(
errno
);
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s"
,
err
,
err
,
errStr
,
linuxErr
,
sError
(
"vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
linuxErrMsg
);
pEntry
->
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
ASSERT
(
0
);
}
}
...
@@ -389,10 +396,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
...
@@ -389,10 +396,11 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
linuxErr
=
errno
;
int32_t
sysErr
=
errno
;
const
char
*
linuxErrMsg
=
strerror
(
errno
);
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s"
,
err
,
err
,
errStr
,
linuxErr
,
sError
(
"vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
linuxErrMsg
);
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
ASSERT
(
0
);
}
}
// ASSERT(walReadWithHandle(pWalHandle, index) == 0);
// ASSERT(walReadWithHandle(pWalHandle, index) == 0);
...
@@ -409,8 +417,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
...
@@ -409,8 +417,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
ASSERT
(
pEntry
->
dataLen
==
pWalHandle
->
pHead
->
head
.
bodyLen
);
ASSERT
(
pEntry
->
dataLen
==
pWalHandle
->
pHead
->
head
.
bodyLen
);
memcpy
(
pEntry
->
data
,
pWalHandle
->
pHead
->
head
.
body
,
pWalHandle
->
pHead
->
head
.
bodyLen
);
memcpy
(
pEntry
->
data
,
pWalHandle
->
pHead
->
head
.
body
,
pWalHandle
->
pHead
->
head
.
bodyLen
);
// need to hold, do not new every time!!
int32_t
saveErr
=
terrno
;
walCloseReadHandle
(
pWalHandle
);
walCloseReadHandle
(
pWalHandle
);
terrno
=
saveErr
;
return
pEntry
;
return
pEntry
;
}
else
{
}
else
{
...
@@ -426,10 +436,11 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
...
@@ -426,10 +436,11 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
linuxErr
=
errno
;
int32_t
sysErr
=
errno
;
const
char
*
linuxErrMsg
=
strerror
(
errno
);
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s"
,
err
,
err
,
errStr
,
linuxErr
,
sError
(
"vgId:%d wal truncate error, from-index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
linuxErrMsg
);
pData
->
pSyncNode
->
vgId
,
fromIndex
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
ASSERT
(
0
);
}
}
return
0
;
return
0
;
...
@@ -460,9 +471,11 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
...
@@ -460,9 +471,11 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
linuxErr
=
errno
;
int32_t
sysErr
=
errno
;
const
char
*
linuxErrMsg
=
strerror
(
errno
);
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s"
,
err
,
err
,
errStr
,
linuxErr
,
linuxErrMsg
);
sError
(
"vgId:%d wal update commit index error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
ASSERT
(
0
);
}
}
return
0
;
return
0
;
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
3b4ccdd4
...
@@ -139,6 +139,15 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
...
@@ -139,6 +139,15 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
// pre index, pre term
// pre index, pre term
SyncIndex
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SyncIndex
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SyncTerm
preLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
nextIndex
);
SyncTerm
preLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
nextIndex
);
if
(
preLogTerm
==
SYNC_TERM_INVALID
)
{
SyncIndex
newNextIndex
=
syncNodeGetLastIndex
(
pSyncNode
)
+
1
;
syncIndexMgrSetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
,
newNextIndex
);
syncIndexMgrSetIndex
(
pSyncNode
->
pMatchIndex
,
pDestId
,
SYNC_INDEX_INVALID
);
sError
(
"vgId:%d sync get pre term error, nextIndex:%ld, update next-index:%ld, match-index:%d, raftid:%ld"
,
pSyncNode
->
vgId
,
nextIndex
,
newNextIndex
,
SYNC_INDEX_INVALID
,
pDestId
->
addr
);
return
-
1
;
}
// batch optimized
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
3b4ccdd4
...
@@ -113,8 +113,16 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
...
@@ -113,8 +113,16 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
}
}
if
(
!
getLastConfig
)
{
if
(
!
getLastConfig
)
{
syncNodeLog3
(
""
,
pSender
->
pSyncNode
);
char
logBuf
[
128
];
ASSERT
(
0
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"snapshot sender update lcindex from %ld to -1"
,
pSender
->
snapshot
.
lastConfigIndex
);
pSender
->
snapshot
.
lastConfigIndex
=
-
1
;
char
*
eventLog
=
snapshotSender2SimpleStr
(
pSender
,
logBuf
);
syncNodeEventLog
(
pSender
->
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
memset
(
&
(
pSender
->
lastConfig
),
0
,
sizeof
(
SSyncCfg
));
}
}
}
else
{
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录