Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
011843d8
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
011843d8
编写于
10月 25, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
10月 25, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17626 from taosdata/feature/sync2-merge
fix(sync): fix coverity scan issues
上级
c3640404
eb7b9d38
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
70 addition
and
24 deletion
+70
-24
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+7
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+42
-20
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+18
-2
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+1
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+2
-1
未找到文件。
source/libs/sync/src/syncCommit.c
浏览文件 @
011843d8
...
...
@@ -45,6 +45,11 @@
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
//
void
syncMaybeAdvanceCommitIndex
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
==
NULL
)
{
sError
(
"pSyncNode is NULL"
);
return
;
}
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
syncNodeErrorLog
(
pSyncNode
,
"not leader, can not advance commit index"
);
return
;
...
...
@@ -172,6 +177,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) {
int32_t
syncNodeDynamicQuorum
(
const
SSyncNode
*
pSyncNode
)
{
return
pSyncNode
->
quorum
;
#if 0
int32_t quorum = 1; // self
int64_t timeNow = taosGetTimestampMs();
...
...
@@ -228,6 +234,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
}
return quorum;
#endif
}
/*
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
011843d8
...
...
@@ -835,7 +835,9 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
sInfo
(
"vgId:%d, sync get retry epset: index:%d %s:%d"
,
pSyncNode
->
vgId
,
i
,
pEpSet
->
eps
[
i
].
fqdn
,
pEpSet
->
eps
[
i
].
port
);
}
pEpSet
->
inUse
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
+
1
)
%
pEpSet
->
numOfEps
;
if
(
pEpSet
->
numOfEps
>
0
)
{
pEpSet
->
inUse
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
+
1
)
%
pEpSet
->
numOfEps
;
}
sInfo
(
"vgId:%d, sync get retry epset in-use:%d"
,
pSyncNode
->
vgId
,
pEpSet
->
inUse
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -1438,12 +1440,13 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
if
(
pSyncNode
==
NULL
)
{
return
;
}
int32_t
ret
;
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
ret
=
raftStoreClose
(
pSyncNode
->
pRaftStore
);
ASSERT
(
ret
==
0
);
...
...
@@ -1879,6 +1882,10 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
}
inline
void
syncNodeEventLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
if
(
pSyncNode
==
NULL
)
{
return
;
}
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
if
(
pSyncNode
->
pFsm
!=
NULL
&&
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
...
...
@@ -1954,6 +1961,10 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
}
inline
void
syncNodeErrorLog
(
const
SSyncNode
*
pSyncNode
,
char
*
str
)
{
if
(
pSyncNode
==
NULL
)
{
return
;
}
int32_t
userStrLen
=
strlen
(
str
);
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
...
...
@@ -2937,6 +2948,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
sTrace
(
"syncNodeEqNoop pSyncNode->FpEqMsg is NULL"
);
}
syncEntryDestory
(
pEntry
);
taosMemoryFree
(
serialized
);
syncClientRequestDestroy
(
pSyncMsg
);
...
...
@@ -3003,13 +3015,14 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
syncPingReply2RpcMsg
(
pMsgReply
,
&
rpcMsg
);
/*
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
// htonl
SMsgHead* pHead = rpcMsg.pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
*/
syncNodeSendMsgById
(
&
pMsgReply
->
destId
,
ths
,
&
rpcMsg
);
syncPingReplyDestroy
(
pMsgReply
);
return
ret
;
}
...
...
@@ -3058,6 +3071,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
// reply
syncNodeSendMsgById
(
&
pMsgReply
->
destId
,
ths
,
&
rpcMsg
);
syncHeartbeatReplyDestroy
(
pMsgReply
);
return
0
;
}
...
...
@@ -3329,17 +3343,23 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
return
0
;
}
// advance commit index to sanpshot first
SSnapshot
snapshot
=
{
0
};
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>=
0
&&
snapshot
.
lastApplyIndex
>=
beginIndex
)
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
snapshot
.
lastApplyIndex
);
syncNodeEventLog
(
ths
,
eventLog
);
if
(
ths
==
NULL
)
{
return
-
1
;
}
if
(
ths
->
pFsm
!=
NULL
&&
ths
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
// advance commit index to sanpshot first
SSnapshot
snapshot
=
{
0
};
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>=
0
&&
snapshot
.
lastApplyIndex
>=
beginIndex
)
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"commit by snapshot from index:%"
PRId64
" to index:%"
PRId64
,
beginIndex
,
snapshot
.
lastApplyIndex
);
syncNodeEventLog
(
ths
,
eventLog
);
// update begin index
beginIndex
=
snapshot
.
lastApplyIndex
+
1
;
// update begin index
beginIndex
=
snapshot
.
lastApplyIndex
+
1
;
}
}
int32_t
code
=
0
;
...
...
@@ -3413,8 +3433,10 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// config change finish
if
(
pEntry
->
originalRpcType
==
TDMT_SYNC_CONFIG_CHANGE_FINISH
)
{
code
=
syncNodeConfigChangeFinish
(
ths
,
&
rpcMsg
,
pEntry
);
ASSERT
(
code
==
0
);
if
(
rpcMsg
.
pCont
!=
NULL
)
{
code
=
syncNodeConfigChangeFinish
(
ths
,
&
rpcMsg
,
pEntry
);
ASSERT
(
code
==
0
);
}
}
#if 0
...
...
@@ -3528,7 +3550,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pSyncNode
->
peersId
)[
i
]);
if
(
pSender
->
start
)
{
if
(
pSender
!=
NULL
&&
pSender
->
start
)
{
sError
(
"sync cannot change3"
);
return
false
;
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
011843d8
...
...
@@ -411,32 +411,40 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
pMsg
->
bytes
=
bytes
;
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
msgType
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
srcId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
srcId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
destId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
destId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
dataLen
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
uint32_t
len
;
char
*
data
=
NULL
;
if
(
tDecodeBinary
(
&
decoder
,
(
uint8_t
**
)(
&
data
),
&
len
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
ASSERT
(
len
=
pMsg
->
dataLen
);
ASSERT
(
len
=
=
pMsg
->
dataLen
);
memcpy
(
pMsg
->
data
,
data
,
len
);
tEndDecode
(
&
decoder
);
...
...
@@ -673,32 +681,40 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) {
pMsg
->
bytes
=
bytes
;
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
msgType
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
srcId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
srcId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU64
(
&
decoder
,
&
pMsg
->
destId
.
addr
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeI32
(
&
decoder
,
&
pMsg
->
destId
.
vgId
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
if
(
tDecodeU32
(
&
decoder
,
&
pMsg
->
dataLen
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
uint32_t
len
;
char
*
data
=
NULL
;
if
(
tDecodeBinary
(
&
decoder
,
(
uint8_t
**
)(
&
data
),
&
len
)
<
0
)
{
taosMemoryFree
(
pMsg
);
return
NULL
;
}
ASSERT
(
len
=
pMsg
->
dataLen
);
ASSERT
(
len
=
=
pMsg
->
dataLen
);
memcpy
(
pMsg
->
data
,
data
,
len
);
tEndDecode
(
&
decoder
);
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
011843d8
...
...
@@ -532,7 +532,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index,
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
raftEntryCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
if
(
code
==
1
)
{
*
ppEntry
=
taosMemoryMalloc
(
pEntry
->
bytes
);
*
ppEntry
=
taosMemoryMalloc
(
(
int64_t
)(
pEntry
->
bytes
)
);
memcpy
(
*
ppEntry
,
pEntry
,
pEntry
->
bytes
);
(
*
ppEntry
)
->
rid
=
-
1
;
}
else
{
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
011843d8
...
...
@@ -209,7 +209,8 @@ bool syncUtilCanPrint(char c) {
}
char
*
syncUtilprintBin
(
char
*
ptr
,
uint32_t
len
)
{
char
*
s
=
taosMemoryMalloc
(
len
+
1
);
int64_t
memLen
=
(
int64_t
)(
len
+
1
);
char
*
s
=
taosMemoryMalloc
(
memLen
);
ASSERT
(
s
!=
NULL
);
memset
(
s
,
0
,
len
+
1
);
memcpy
(
s
,
ptr
,
len
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录