Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4c554437
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4c554437
编写于
6月 17, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
6月 17, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13931 from taosdata/fix/tsim
refactor: adjust logs
上级
f9fe106a
43f8f34b
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
64 addition
and
61 deletion
+64
-61
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+4
-4
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+1
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+5
-4
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+2
-1
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+2
-2
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+2
-2
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+22
-21
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+2
-2
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+3
-3
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+19
-19
未找到文件。
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
4c554437
...
@@ -510,8 +510,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
...
@@ -510,8 +510,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if
(
IsReq
(
pMsg
)
&&
pMsg
->
msgType
!=
TDMT_MND_MQ_TIMER
&&
pMsg
->
msgType
!=
TDMT_MND_TELEM_TIMER
&&
if
(
IsReq
(
pMsg
)
&&
pMsg
->
msgType
!=
TDMT_MND_MQ_TIMER
&&
pMsg
->
msgType
!=
TDMT_MND_TELEM_TIMER
&&
pMsg
->
msgType
!=
TDMT_MND_TRANS_TIMER
)
{
pMsg
->
msgType
!=
TDMT_MND_TRANS_TIMER
)
{
mError
(
"msg:%p, failed to check mnode state since %s, app:%p type:%s"
,
pMsg
,
terrstr
(),
pMsg
->
info
.
ahandle
,
mError
(
"msg:%p, failed to check mnode state since %s, type:%s"
,
pMsg
,
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
));
TMSG_INFO
(
pMsg
->
msgType
));
SEpSet
epSet
=
{
0
};
SEpSet
epSet
=
{
0
};
mndGetMnodeEpSet
(
pMsg
->
info
.
node
,
&
epSet
);
mndGetMnodeEpSet
(
pMsg
->
info
.
node
,
&
epSet
);
...
@@ -534,7 +533,8 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
...
@@ -534,7 +533,8 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if
(
!
IsReq
(
pMsg
))
return
0
;
if
(
!
IsReq
(
pMsg
))
return
0
;
if
(
pMsg
->
contLen
!=
0
&&
pMsg
->
pCont
!=
NULL
)
return
0
;
if
(
pMsg
->
contLen
!=
0
&&
pMsg
->
pCont
!=
NULL
)
return
0
;
mError
(
"msg:%p, failed to check msg content, app:%p type:%s"
,
pMsg
,
pMsg
->
info
.
ahandle
,
TMSG_INFO
(
pMsg
->
msgType
));
mError
(
"msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s"
,
pMsg
,
pMsg
->
pCont
,
pMsg
->
contLen
,
pMsg
->
info
.
ahandle
,
TMSG_INFO
(
pMsg
->
msgType
));
terrno
=
TSDB_CODE_INVALID_MSG_LEN
;
terrno
=
TSDB_CODE_INVALID_MSG_LEN
;
return
-
1
;
return
-
1
;
}
}
...
@@ -558,7 +558,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
...
@@ -558,7 +558,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mTrace
(
"msg:%p, won't response immediately since in progress"
,
pMsg
);
mTrace
(
"msg:%p, won't response immediately since in progress"
,
pMsg
);
}
else
if
(
code
==
0
)
{
}
else
if
(
code
==
0
)
{
mTrace
(
"msg:%p, successfully processed
and response
"
,
pMsg
);
mTrace
(
"msg:%p, successfully processed"
,
pMsg
);
}
else
{
}
else
{
mError
(
"msg:%p, failed to process since %s, app:%p type:%s"
,
pMsg
,
terrstr
(),
pMsg
->
info
.
ahandle
,
mError
(
"msg:%p, failed to process since %s, app:%p type:%s"
,
pMsg
,
terrstr
(),
pMsg
->
info
.
ahandle
,
TMSG_INFO
(
pMsg
->
msgType
));
TMSG_INFO
(
pMsg
->
msgType
));
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
4c554437
...
@@ -661,7 +661,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
...
@@ -661,7 +661,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"trans:%d, sync to other mnodes
"
,
pTrans
->
id
);
mDebug
(
"trans:%d, sync to other mnodes
, stage:%s"
,
pTrans
->
id
,
mndTransStr
(
pTrans
->
stage
)
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
,
pTrans
->
id
);
int32_t
code
=
mndSyncPropose
(
pMnode
,
pRaw
,
pTrans
->
id
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to sync since %s"
,
pTrans
->
id
,
terrstr
());
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
4c554437
...
@@ -1058,7 +1058,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del
...
@@ -1058,7 +1058,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del
static
int32_t
mndAddIncVgroupReplicaToTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
static
int32_t
mndAddIncVgroupReplicaToTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
newDnodeId
)
{
int32_t
newDnodeId
)
{
mDebug
(
"vgId:%d, will add 1 vnode, replica:%d
,
dnode:%d"
,
pVgroup
->
vgId
,
pVgroup
->
replica
,
newDnodeId
);
mDebug
(
"vgId:%d, will add 1 vnode, replica:%d dnode:%d"
,
pVgroup
->
vgId
,
pVgroup
->
replica
,
newDnodeId
);
SVnodeGid
*
pGid
=
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
];
SVnodeGid
*
pGid
=
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
];
pVgroup
->
replica
++
;
pVgroup
->
replica
++
;
...
@@ -1074,7 +1074,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
...
@@ -1074,7 +1074,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
static
int32_t
mndAddDecVgroupReplicaFromTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
static
int32_t
mndAddDecVgroupReplicaFromTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
delDnodeId
)
{
int32_t
delDnodeId
)
{
mDebug
(
"vgId:%d, will remove 1 vnode, replica:%d
,
dnode:%d"
,
pVgroup
->
vgId
,
pVgroup
->
replica
,
delDnodeId
);
mDebug
(
"vgId:%d, will remove 1 vnode, replica:%d dnode:%d"
,
pVgroup
->
vgId
,
pVgroup
->
replica
,
delDnodeId
);
SVnodeGid
*
pGid
=
NULL
;
SVnodeGid
*
pGid
=
NULL
;
SVnodeGid
delGid
=
{
0
};
SVnodeGid
delGid
=
{
0
};
...
@@ -1116,7 +1116,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
...
@@ -1116,7 +1116,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
memcpy
(
&
newVg
,
pVgroup
,
sizeof
(
SVgObj
));
memcpy
(
&
newVg
,
pVgroup
,
sizeof
(
SVgObj
));
mInfo
(
"vgId:%d, vgroup info before redistribute, replica:%d"
,
newVg
.
vgId
,
newVg
.
replica
);
mInfo
(
"vgId:%d, vgroup info before redistribute, replica:%d"
,
newVg
.
vgId
,
newVg
.
replica
);
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
;
++
i
)
{
mInfo
(
"vgId:%d, vnode:%d dnode:%d"
,
newVg
.
vgId
,
i
,
newVg
.
vnodeGid
[
i
].
dnodeId
);
mInfo
(
"vgId:%d, vnode:%d dnode:%d role:%s"
,
newVg
.
vgId
,
i
,
newVg
.
vnodeGid
[
i
].
dnodeId
,
syncStr
(
newVg
.
vnodeGid
[
i
].
role
));
}
}
if
(
pNew1
!=
NULL
&&
pOld1
!=
NULL
)
{
if
(
pNew1
!=
NULL
&&
pOld1
!=
NULL
)
{
...
@@ -1198,7 +1199,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
...
@@ -1198,7 +1199,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
goto
_OVER
;
goto
_OVER
;
}
}
mInfo
(
"vgId:%d, start to redistribute to dnode %d:%d:%d"
,
req
.
vgId
,
req
.
dnodeId1
,
req
.
dnodeId2
,
req
.
dnodeId3
);
mInfo
(
"vgId:%d, start to redistribute
vgroup
to dnode %d:%d:%d"
,
req
.
vgId
,
req
.
dnodeId1
,
req
.
dnodeId2
,
req
.
dnodeId3
);
if
(
mndCheckOperAuth
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_REDISTRIBUTE_VGROUP
)
!=
0
)
goto
_OVER
;
if
(
mndCheckOperAuth
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_REDISTRIBUTE_VGROUP
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
4c554437
...
@@ -628,7 +628,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
...
@@ -628,7 +628,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
void
*
tagData
=
NULL
;
void
*
tagData
=
NULL
;
if
(
param
->
val
==
NULL
)
{
if
(
param
->
val
==
NULL
)
{
metaError
(
"vgId:%d failed to filter NULL data"
,
TD_VID
(
pMeta
->
pVnode
));
metaError
(
"vgId:%d
,
failed to filter NULL data"
,
TD_VID
(
pMeta
->
pVnode
));
return
-
1
;
return
-
1
;
}
else
{
}
else
{
if
(
IS_VAR_DATA_TYPE
(
param
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
param
->
type
))
{
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
4c554437
...
@@ -61,6 +61,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) {
...
@@ -61,6 +61,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) {
return
-
1
;
return
-
1
;
}
}
vInfo
(
"vgId:%d, start to transfer leader"
,
TD_VID
(
pVnode
));
if
(
syncLeaderTransfer
(
pVnode
->
sync
)
!=
0
)
{
if
(
syncLeaderTransfer
(
pVnode
->
sync
)
!=
0
)
{
vError
(
"vgId:%d, failed to transfer leader since:%s"
,
TD_VID
(
pVnode
),
terrstr
());
vError
(
"vgId:%d, failed to transfer leader since:%s"
,
TD_VID
(
pVnode
),
terrstr
());
return
-
1
;
return
-
1
;
...
@@ -297,7 +298,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
...
@@ -297,7 +298,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
ret
=
-
1
;
ret
=
-
1
;
}
}
if
(
ret
!=
0
)
{
if
(
ret
!=
0
&&
terrno
==
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
}
return
ret
;
return
ret
;
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
4c554437
...
@@ -713,7 +713,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -713,7 +713,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
// delete confict entries
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
ASSERT
(
code
==
0
);
ASSERT
(
code
==
0
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld"
,
ths
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
delBegin
,
delEnd
);
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
delBegin
,
delEnd
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
...
@@ -1062,7 +1062,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
...
@@ -1062,7 +1062,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
ths
->
commitIndex
=
snapshot
.
lastApplyIndex
;
ths
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
commitBegin
,
commitEnd
);
commitBegin
,
commitEnd
);
}
}
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
4c554437
...
@@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
...
@@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotSender2Str
(
pSender
);
char
*
s
=
snapshotSender2Str
(
pSender
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
...
@@ -201,7 +201,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
...
@@ -201,7 +201,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
taosMemoryFree
(
s
);
taosMemoryFree
(
s
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu"
,
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
4c554437
...
@@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
...
@@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
);
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
);
}
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
4c554437
...
@@ -414,7 +414,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
...
@@ -414,7 +414,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
assert
(
rid
==
pSyncNode
->
rid
);
assert
(
rid
==
pSyncNode
->
rid
);
sMeta
->
lastConfigIndex
=
pSyncNode
->
pRaftCfg
->
lastConfigIndex
;
sMeta
->
lastConfigIndex
=
pSyncNode
->
pRaftCfg
->
lastConfigIndex
;
sTrace
(
"
sync get snapshot meta: lastConfigIndex:%ld"
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
);
sTrace
(
"
vgId:%d, get snapshot meta, lastConfigIndex:%"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
0
;
return
0
;
...
@@ -437,7 +437,8 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
...
@@ -437,7 +437,8 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
}
}
}
}
sMeta
->
lastConfigIndex
=
lastIndex
;
sMeta
->
lastConfigIndex
=
lastIndex
;
sTrace
(
"sync get snapshot meta by index:%ld lastConfigIndex:%ld"
,
snapshotIndex
,
sMeta
->
lastConfigIndex
);
sTrace
(
"vgId:%d, get snapshot meta by index:%"
PRId64
" lastConfigIndex:%"
PRId64
,
pSyncNode
->
vgId
,
snapshotIndex
,
sMeta
->
lastConfigIndex
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
0
;
return
0
;
...
@@ -598,7 +599,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
...
@@ -598,7 +599,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return
-
1
;
return
-
1
;
}
}
assert
(
rid
==
pSyncNode
->
rid
);
assert
(
rid
==
pSyncNode
->
rid
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
ret
=
syncNodePropose
(
pSyncNode
,
pMsg
,
isWeak
);
ret
=
syncNodePropose
(
pSyncNode
,
pMsg
,
isWeak
);
...
@@ -609,7 +610,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
...
@@ -609,7 +610,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
0
;
int32_t
ret
=
0
;
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
...
@@ -857,7 +858,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
...
@@ -857,7 +858,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open"
,
pSyncNode
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu sync open"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
return
pSyncNode
;
return
pSyncNode
;
...
@@ -905,7 +906,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
...
@@ -905,7 +906,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close"
,
pSyncNode
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu sync close"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
int32_t
ret
;
int32_t
ret
;
...
@@ -1294,7 +1295,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
...
@@ -1294,7 +1295,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int
len
=
256
;
int
len
=
256
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
snprintf
(
s
,
len
,
snprintf
(
s
,
len
,
"syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"syncNode: vgId:%d
,
currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"electTimerLogicClock:%lu, "
"electTimerLogicClock:%lu, "
"electTimerLogicClockUser:%lu, "
"electTimerLogicClockUser:%lu, "
"electTimerMS:%d, replicaNum:%d"
,
"electTimerMS:%d, replicaNum:%d"
,
...
@@ -1345,7 +1346,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
...
@@ -1345,7 +1346,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
oldSenders
[
i
]
=
(
pSyncNode
->
senders
)[
i
];
oldSenders
[
i
]
=
(
pSyncNode
->
senders
)[
i
];
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu"
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
i
,
oldSenders
[
i
],
oldSenders
[
i
]
->
privateTerm
);
pSyncNode
->
pRaftStore
->
currentTerm
,
i
,
oldSenders
[
i
],
oldSenders
[
i
]
->
privateTerm
);
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
...
@@ -1400,7 +1401,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
...
@@ -1400,7 +1401,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
uint16_t
port
;
uint16_t
port
;
syncUtilU642Addr
((
pSyncNode
->
replicasId
)[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
syncUtilU642Addr
((
pSyncNode
->
replicasId
)[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"privateTerm:%lu"
,
"privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
],
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
],
...
@@ -1413,7 +1414,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
...
@@ -1413,7 +1414,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
int32_t
oldreplicaIndex
=
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
;
int32_t
oldreplicaIndex
=
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
;
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
=
i
;
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
=
i
;
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"reset:%d"
,
"reset:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
pSyncNode
->
pRaftStore
->
currentTerm
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
...
@@ -1426,7 +1427,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
...
@@ -1426,7 +1427,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu"
,
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
senders
)[
i
],
i
,
(
pSyncNode
->
senders
)[
i
]
->
privateTerm
);
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
senders
)[
i
],
i
,
(
pSyncNode
->
senders
)[
i
]
->
privateTerm
);
}
}
...
@@ -1436,7 +1437,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
...
@@ -1436,7 +1437,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
oldSenders
[
i
]
!=
NULL
)
{
if
(
oldSenders
[
i
]
!=
NULL
)
{
snapshotSenderDestroy
(
oldSenders
[
i
]);
snapshotSenderDestroy
(
oldSenders
[
i
]);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d"
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldSenders
[
i
],
i
);
pSyncNode
->
pRaftStore
->
currentTerm
,
oldSenders
[
i
],
i
);
oldSenders
[
i
]
=
NULL
;
oldSenders
[
i
]
=
NULL
;
...
@@ -1509,7 +1510,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
...
@@ -1509,7 +1510,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s"
,
"restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
...
@@ -1551,7 +1552,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
...
@@ -1551,7 +1552,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode
->
restoreFinish
=
false
;
pSyncNode
->
restoreFinish
=
false
;
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s"
,
"restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
...
@@ -1857,7 +1858,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
...
@@ -1857,7 +1858,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue ping msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
sError
(
"vgId:%d
,
sync enqueue ping msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
return
;
...
@@ -1891,7 +1892,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
...
@@ -1891,7 +1892,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue elect msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
sError
(
"vgId:%d
,
sync enqueue elect msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
return
;
...
@@ -1929,7 +1930,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
...
@@ -1929,7 +1930,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue timer msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
sError
(
"vgId:%d
,
sync enqueue timer msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
return
;
...
@@ -2129,12 +2130,12 @@ const char* syncStr(ESyncState state) {
...
@@ -2129,12 +2130,12 @@ const char* syncStr(ESyncState state) {
static
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
)
{
static
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
)
{
SyncLeaderTransfer
*
pSyncLeaderTransfer
=
syncLeaderTransferFromRpcMsg2
(
pRpcMsg
);
SyncLeaderTransfer
*
pSyncLeaderTransfer
=
syncLeaderTransferFromRpcMsg2
(
pRpcMsg
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer"
,
ths
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
);
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
);
if
(
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
if
(
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
==
ths
->
myNodeInfo
.
nodePort
)
{
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
==
ths
->
myNodeInfo
.
nodePort
)
{
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu"
,
ths
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
newLeaderId
.
addr
);
pSyncLeaderTransfer
->
newLeaderId
.
addr
);
...
@@ -2275,7 +2276,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
...
@@ -2275,7 +2276,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
ESyncState
state
=
flag
;
ESyncState
state
=
flag
;
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%"
PRId64
" to index:%"
PRId64
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
", %s"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
beginIndex
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
beginIndex
,
endIndex
,
syncUtilState2String
(
state
));
endIndex
,
syncUtilState2String
(
state
));
...
@@ -2327,7 +2328,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
...
@@ -2327,7 +2328,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths
->
pFsm
->
FpRestoreFinishCb
(
ths
->
pFsm
);
ths
->
pFsm
->
FpRestoreFinishCb
(
ths
->
pFsm
);
}
}
ths
->
restoreFinish
=
true
;
ths
->
restoreFinish
=
true
;
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld"
,
ths
->
vgId
,
sDebug
(
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
ths
->
state
),
pEntry
->
index
);
syncUtilState2String
(
ths
->
state
),
pEntry
->
index
);
}
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
4c554437
...
@@ -164,7 +164,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
...
@@ -164,7 +164,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync
(
pWal
,
true
);
walFsync
(
pWal
,
true
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d"
,
"originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
...
@@ -325,7 +325,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
...
@@ -325,7 +325,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync
(
pWal
,
true
);
walFsync
(
pWal
,
true
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d"
,
"originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
4c554437
...
@@ -47,7 +47,7 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
...
@@ -47,7 +47,7 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
SSyncNode
*
pSyncNode
=
pObj
->
data
;
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
...
@@ -74,7 +74,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
...
@@ -74,7 +74,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
SSyncNode
*
pSyncNode
=
pObj
->
data
;
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p"
,
"ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
...
@@ -96,7 +96,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
...
@@ -96,7 +96,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
SSyncNode
*
pSyncNode
=
pObj
->
data
;
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p"
,
"ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
4c554437
...
@@ -141,7 +141,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
...
@@ -141,7 +141,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"lastConfigIndex:%ld privateTerm:%lu send "
...
@@ -153,7 +153,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
...
@@ -153,7 +153,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
taosMemoryFree
(
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
"lastConfigIndex:%ld privateTerm:%lu"
,
...
@@ -287,7 +287,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
...
@@ -287,7 +287,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"lastConfigIndex:%ld privateTerm:%lu send "
...
@@ -299,7 +299,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
...
@@ -299,7 +299,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
taosMemoryFree
(
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
"lastConfigIndex:%ld privateTerm:%lu"
,
...
@@ -310,7 +310,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
...
@@ -310,7 +310,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
}
}
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
"lastConfigIndex:%ld privateTerm:%lu"
,
...
@@ -349,7 +349,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
...
@@ -349,7 +349,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu send "
"privateTerm:%lu send "
"msg:%s"
,
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
...
@@ -358,7 +358,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
...
@@ -358,7 +358,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
taosMemoryFree
(
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu"
,
"privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
privateTerm
);
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
privateTerm
);
...
@@ -594,7 +594,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -594,7 +594,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
...
@@ -604,7 +604,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -604,7 +604,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree
(
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu"
,
"lastConfigIndex:%ld privateTerm:%lu"
,
...
@@ -648,7 +648,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -648,7 +648,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool
isDrop
;
bool
isDrop
;
if
(
IamInNew
)
{
if
(
IamInNew
)
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"lastTerm:%lu, "
"lastTerm:%lu, "
"lastConfigIndex:%ld "
,
"lastConfigIndex:%ld "
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
...
@@ -656,7 +656,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -656,7 +656,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeUpdateConfig
(
pSyncNode
,
&
newSyncCfg
,
pMsg
->
lastConfigIndex
,
&
isDrop
);
syncNodeUpdateConfig
(
pSyncNode
,
&
newSyncCfg
,
pMsg
->
lastConfigIndex
,
&
isDrop
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"newCfg, "
"newCfg, "
"lastIndex:%ld, lastTerm:%lu, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld "
,
"lastConfigIndex:%ld "
,
...
@@ -694,7 +694,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -694,7 +694,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
logSimpleStr
=
logStoreSimple2Str
(
pSyncNode
->
pLogStore
);
char
*
logSimpleStr
=
logStoreSimple2Str
(
pSyncNode
->
pLogStore
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s"
,
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s"
,
...
@@ -704,7 +704,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -704,7 +704,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree
(
logSimpleStr
);
taosMemoryFree
(
logSimpleStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu"
,
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu"
,
...
@@ -721,7 +721,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -721,7 +721,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
...
@@ -730,7 +730,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -730,7 +730,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree
(
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
"lastConfigIndex:%ld, privateTerm:%lu"
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
...
@@ -750,7 +750,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -750,7 +750,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
...
@@ -761,7 +761,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -761,7 +761,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree
(
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
"lastConfigIndex:%ld, privateTerm:%lu"
,
...
@@ -787,7 +787,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -787,7 +787,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
...
@@ -797,7 +797,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -797,7 +797,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree
(
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
}
else
{
sDebug
(
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"vgId:%d
,
sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
"lastConfigIndex:%ld, privateTerm:%lu"
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录