Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b71a9e34
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b71a9e34
编写于
9月 27, 2022
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: improve error handling in syncNodeOpen
上级
3db1920a
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
131 addition
and
56 deletion
+131
-56
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+0
-1
source/libs/sync/inc/syncUtil.h
source/libs/sync/inc/syncUtil.h
+1
-1
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+5
-2
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+99
-42
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+0
-2
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+4
-2
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+4
-0
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+4
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+10
-4
source/libs/sync/src/syncVoteMgr.c
source/libs/sync/src/syncVoteMgr.c
+4
-1
未找到文件。
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
b71a9e34
...
...
@@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open sync
if
(
vnodeSyncOpen
(
pVnode
,
dir
))
{
vError
(
"vgId:%d, failed to open sync since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
...
...
source/libs/sync/inc/syncUtil.h
浏览文件 @
b71a9e34
...
...
@@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void
syncUtilU642Addr
(
uint64_t
u64
,
char
*
host
,
size_t
len
,
uint16_t
*
port
);
void
syncUtilnodeInfo2EpSet
(
const
SNodeInfo
*
pNodeInfo
,
SEpSet
*
pEpSet
);
void
syncUtilraftId2EpSet
(
const
SRaftId
*
raftId
,
SEpSet
*
pEpSet
);
void
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
);
bool
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
);
bool
syncUtilSameId
(
const
SRaftId
*
pId1
,
const
SRaftId
*
pId2
);
bool
syncUtilEmptyId
(
const
SRaftId
*
pId
);
...
...
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
b71a9e34
...
...
@@ -20,7 +20,10 @@
SSyncIndexMgr
*
syncIndexMgrCreate
(
SSyncNode
*
pSyncNode
)
{
SSyncIndexMgr
*
pSyncIndexMgr
=
taosMemoryMalloc
(
sizeof
(
SSyncIndexMgr
));
ASSERT
(
pSyncIndexMgr
!=
NULL
);
if
(
pSyncIndexMgr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
memset
(
pSyncIndexMgr
,
0
,
sizeof
(
SSyncIndexMgr
));
pSyncIndexMgr
->
replicas
=
&
(
pSyncNode
->
replicasId
);
...
...
@@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI
}
ASSERT
(
0
);
return
-
1
;
}
\ No newline at end of file
}
source/libs/sync/src/syncMain.c
浏览文件 @
b71a9e34
...
...
@@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
// life cycle
static
void
syncFreeNode
(
void
*
param
);
// ---------------------------------
static
void
syncNodeFreeCb
(
void
*
param
)
{
syncNodeClose
(
param
);
param
=
NULL
;
}
int32_t
syncInit
()
{
int32_t
ret
=
0
;
if
(
!
syncEnvIsStart
())
{
tsNodeRefId
=
taosOpenRef
(
200
,
sync
FreeNode
);
tsNodeRefId
=
taosOpenRef
(
200
,
sync
NodeFreeCb
);
if
(
tsNodeRefId
<
0
)
{
sError
(
"failed to init node ref"
);
syncCleanUp
();
...
...
@@ -86,11 +88,15 @@ void syncCleanUp() {
int64_t
syncOpen
(
const
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
ASSERT
(
pSyncNode
!=
NULL
);
if
(
pSyncNode
==
NULL
)
{
sError
(
"failed to open sync node. vgId:%d"
,
pSyncInfo
->
vgId
);
return
-
1
;
}
pSyncNode
->
rid
=
taosAddRef
(
tsNodeRefId
,
pSyncNode
);
if
(
pSyncNode
->
rid
<
0
)
{
syncFreeNode
(
pSyncNode
);
syncNodeClose
(
pSyncNode
);
pSyncNode
=
NULL
;
return
-
1
;
}
...
...
@@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) {
void
syncStop
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
return
;
int32_t
vgId
=
pSyncNode
->
vgId
;
syncNodeClose
(
pSyncNode
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosRemoveRef
(
tsNodeRefId
,
rid
);
sDebug
(
"vgId:%d, sync rid:%"
PRId64
" is removed from rsetId:%"
PRId64
,
vgId
,
rid
,
tsNodeRefId
);
}
...
...
@@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
if
(
!
syncNodeCheckNewConfig
(
pSyncNode
,
pNewCfg
))
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
terrno
=
TSDB_CODE_SYN_NEW_CONFIG_ERROR
;
sError
(
"
syncNodeCheckNewConfig error"
);
sError
(
"
invalid new config. vgId:%d"
,
pSyncNode
->
vgId
);
return
-
1
;
}
...
...
@@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
if
(
!
syncNodeCheckNewConfig
(
pSyncNode
,
pNewCfg
))
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
terrno
=
TSDB_CODE_SYN_NEW_CONFIG_ERROR
;
sError
(
"
syncNodeCheckNewConfig error"
);
sError
(
"
invalid new config. vgId:%d"
,
pSyncNode
->
vgId
);
return
-
1
;
}
...
...
@@ -941,16 +945,18 @@ _END:
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pOldSyncInfo
)
{
SSyncInfo
*
pSyncInfo
=
(
SSyncInfo
*
)
pOldSyncInfo
;
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
SSyncNode
));
ASSERT
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSyncNode
));
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
int32_t
ret
=
0
;
if
(
!
taosDirExist
((
char
*
)(
pSyncInfo
->
path
)))
{
if
(
taosMkDir
(
pSyncInfo
->
path
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
sError
(
"failed to create dir:%s since %s"
,
pSyncInfo
->
path
,
terrstr
());
return
NULL
;
goto
_error
;
}
}
...
...
@@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta
.
lastConfigIndex
=
SYNC_INDEX_INVALID
;
meta
.
batchSize
=
pSyncInfo
->
batchSize
;
ret
=
raftCfgCreateFile
((
SSyncCfg
*
)
&
(
pSyncInfo
->
syncCfg
),
meta
,
pSyncNode
->
configPath
);
ASSERT
(
ret
==
0
);
if
(
ret
!=
0
)
{
sError
(
"failed to create raft cfg file. configPath: %s"
,
pSyncNode
->
configPath
);
goto
_error
;
}
}
else
{
// update syncCfg by raft_config.json
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
ASSERT
(
pSyncNode
->
pRaftCfg
!=
NULL
);
if
(
pSyncNode
->
pRaftCfg
==
NULL
)
{
sError
(
"failed to open raft cfg file. path:%s"
,
pSyncNode
->
configPath
);
goto
_error
;
}
pSyncInfo
->
syncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
raftCfgClose
(
pSyncNode
->
pRaftCfg
);
pSyncNode
->
pRaftCfg
=
NULL
;
}
// init by SSyncInfo
...
...
@@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init raft config
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
ASSERT
(
pSyncNode
->
pRaftCfg
!=
NULL
);
if
(
pSyncNode
->
pRaftCfg
==
NULL
)
{
sError
(
"failed to open raft cfg file. path:%s"
,
pSyncNode
->
configPath
);
goto
_error
;
}
// init internal
pSyncNode
->
myNodeInfo
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
];
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
myNodeInfo
,
pSyncNode
->
vgId
,
&
pSyncNode
->
myRaftId
);
if
(
!
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
myNodeInfo
,
pSyncNode
->
vgId
,
&
pSyncNode
->
myRaftId
))
{
sError
(
"failed to determine my raft member id. vgId:%d"
,
pSyncNode
->
vgId
);
goto
_error
;
}
// init peersNum, peers, peersId
pSyncNode
->
peersNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
-
1
;
...
...
@@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
}
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
peersNodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
peersId
[
i
]);
if
(
!
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
peersNodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
peersId
[
i
]))
{
sError
(
"failed to determine raft member id. vgId:%d, peer:%d"
,
pSyncNode
->
vgId
,
i
);
goto
_error
;
}
}
// init replicaNum, replicasId
pSyncNode
->
replicaNum
=
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
;
++
i
)
{
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
replicasId
[
i
]);
if
(
!
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
],
pSyncNode
->
vgId
,
&
pSyncNode
->
replicasId
[
i
]))
{
sError
(
"failed to determine raft member id. vgId:%d, replica:%d"
,
pSyncNode
->
vgId
,
i
);
goto
_error
;
}
}
// init raft algorithm
pSyncNode
->
pFsm
=
pSyncInfo
->
pFsm
;
pSyncInfo
->
pFsm
=
NULL
;
pSyncNode
->
quorum
=
syncUtilQuorum
(
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
);
pSyncNode
->
leaderCache
=
EMPTY_RAFT_ID
;
...
...
@@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init TLA+ server vars
pSyncNode
->
state
=
TAOS_SYNC_STATE_FOLLOWER
;
pSyncNode
->
pRaftStore
=
raftStoreOpen
(
pSyncNode
->
raftStorePath
);
ASSERT
(
pSyncNode
->
pRaftStore
!=
NULL
);
if
(
pSyncNode
->
pRaftStore
==
NULL
)
{
sError
(
"failed to open raft store. path: %s"
,
pSyncNode
->
raftStorePath
);
goto
_error
;
}
// init TLA+ candidate vars
pSyncNode
->
pVotesGranted
=
voteGrantedCreate
(
pSyncNode
);
ASSERT
(
pSyncNode
->
pVotesGranted
!=
NULL
);
if
(
pSyncNode
->
pVotesGranted
==
NULL
)
{
sError
(
"failed to create VotesGranted. vgId:%d"
,
pSyncNode
->
vgId
);
goto
_error
;
}
pSyncNode
->
pVotesRespond
=
votesRespondCreate
(
pSyncNode
);
ASSERT
(
pSyncNode
->
pVotesRespond
!=
NULL
);
if
(
pSyncNode
->
pVotesRespond
==
NULL
)
{
sError
(
"failed to create VotesRespond. vgId:%d"
,
pSyncNode
->
vgId
);
goto
_error
;
}
// init TLA+ leader vars
pSyncNode
->
pNextIndex
=
syncIndexMgrCreate
(
pSyncNode
);
ASSERT
(
pSyncNode
->
pNextIndex
!=
NULL
);
if
(
pSyncNode
->
pNextIndex
==
NULL
)
{
sError
(
"failed to create SyncIndexMgr. vgId:%d"
,
pSyncNode
->
vgId
);
goto
_error
;
}
pSyncNode
->
pMatchIndex
=
syncIndexMgrCreate
(
pSyncNode
);
ASSERT
(
pSyncNode
->
pMatchIndex
!=
NULL
);
if
(
pSyncNode
->
pMatchIndex
==
NULL
)
{
sError
(
"failed to create SyncIndexMgr. vgId:%d"
,
pSyncNode
->
vgId
);
goto
_error
;
}
// init TLA+ log vars
pSyncNode
->
pLogStore
=
logStoreCreate
(
pSyncNode
);
ASSERT
(
pSyncNode
->
pLogStore
!=
NULL
);
if
(
pSyncNode
->
pLogStore
==
NULL
)
{
sError
(
"failed to create SyncLogStore. vgId:%d"
,
pSyncNode
->
vgId
);
goto
_error
;
}
SyncIndex
commitIndex
=
SYNC_INDEX_INVALID
;
if
(
pSyncNode
->
pFsm
!=
NULL
&&
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
SSnapshot
snapshot
=
{
0
};
int32_t
code
=
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
ASSERT
(
code
==
0
);
if
(
code
!=
0
)
{
sError
(
"failed to get snapshot info. vgId:%d, code:%d"
,
pSyncNode
->
vgId
,
code
);
goto
_error
;
}
if
(
snapshot
.
lastApplyIndex
>
commitIndex
)
{
commitIndex
=
snapshot
.
lastApplyIndex
;
syncNodeEventLog
(
pSyncNode
,
"reset commit index by snapshot"
);
...
...
@@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// tools
pSyncNode
->
pSyncRespMgr
=
syncRespMgrCreate
(
pSyncNode
,
SYNC_RESP_TTL_MS
);
ASSERT
(
pSyncNode
->
pSyncRespMgr
!=
NULL
);
if
(
pSyncNode
->
pSyncRespMgr
==
NULL
)
{
sError
(
"failed to create SyncRespMgr. vgId:%d"
,
pSyncNode
->
vgId
);
goto
_error
;
}
// restore state
pSyncNode
->
restoreFinish
=
false
;
...
...
@@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
syncNodeEventLog
(
pSyncNode
,
"sync open"
);
return
pSyncNode
;
_error:
if
(
pSyncInfo
->
pFsm
)
{
taosMemoryFree
(
pSyncInfo
->
pFsm
);
pSyncInfo
->
pFsm
=
NULL
;
}
syncNodeClose
(
pSyncNode
);
pSyncNode
=
NULL
;
return
NULL
;
}
void
syncNodeMaybeUpdateCommitBySnapshot
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
syncNodeEventLog
(
pSyncNode
,
"sync close"
);
if
(
pSyncNode
==
NULL
)
{
return
;
}
int32_t
ret
;
ASSERT
(
pSyncNode
!=
NULL
);
ret
=
raftStoreClose
(
pSyncNode
->
pRaftStore
);
ASSERT
(
ret
==
0
);
syncRespMgrDestroy
(
pSyncNode
->
pSyncRespMgr
);
pSyncNode
->
pSyncRespMgr
=
NULL
;
voteGrantedDestroy
(
pSyncNode
->
pVotesGranted
);
pSyncNode
->
pVotesGranted
=
NULL
;
votesRespondDestory
(
pSyncNode
->
pVotesRespond
);
pSyncNode
->
pVotesRespond
=
NULL
;
syncIndexMgrDestroy
(
pSyncNode
->
pNextIndex
);
pSyncNode
->
pNextIndex
=
NULL
;
syncIndexMgrDestroy
(
pSyncNode
->
pMatchIndex
);
pSyncNode
->
pMatchIndex
=
NULL
;
logStoreDestory
(
pSyncNode
->
pLogStore
);
pSyncNode
->
pLogStore
=
NULL
;
raftCfgClose
(
pSyncNode
->
pRaftCfg
);
pSyncNode
->
pRaftCfg
=
NULL
;
syncNodeStopPingTimer
(
pSyncNode
);
syncNodeStopElectTimer
(
pSyncNode
);
...
...
@@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode
->
pNewNodeReceiver
=
NULL
;
}
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
taosMemoryFree
(
pSyncNode
);
}
// option
...
...
@@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return
;
}
}
else
{
sError
(
"
syncNodeEqHeartbeatTimer FpEqMsg is NULL"
);
sError
(
"
vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set."
,
pSyncNode
->
vgId
);
}
syncTimeoutDestroy
(
pSyncMsg
);
...
...
@@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
return
0
;
}
static
void
syncFreeNode
(
void
*
param
)
{
SSyncNode
*
pNode
=
param
;
// inner object already free
// syncNodePrint2((char*)"==syncFreeNode==", pNode);
taosMemoryFree
(
pNode
);
}
const
char
*
syncStr
(
ESyncState
state
)
{
switch
(
state
)
{
case
TAOS_SYNC_STATE_FOLLOWER
:
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
b71a9e34
...
...
@@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
int32_t
sysErr
=
errno
;
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
return
-
1
;
}
...
...
source/libs/sync/src/syncRaftStore.c
浏览文件 @
b71a9e34
...
...
@@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) {
SRaftStore
*
pRaftStore
=
taosMemoryMalloc
(
sizeof
(
SRaftStore
));
if
(
pRaftStore
==
NULL
)
{
sError
(
"raftStoreOpen malloc error"
)
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
memset
(
pRaftStore
,
0
,
sizeof
(
*
pRaftStore
));
...
...
@@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) {
}
int32_t
raftStoreClose
(
SRaftStore
*
pRaftStore
)
{
ASSERT
(
pRaftStore
!=
NULL
);
if
(
pRaftStore
==
NULL
)
{
return
0
;
}
taosCloseFile
(
&
pRaftStore
->
pFile
);
taosMemoryFree
(
pRaftStore
);
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
b71a9e34
...
...
@@ -19,6 +19,10 @@
SSyncRespMgr
*
syncRespMgrCreate
(
void
*
data
,
int64_t
ttl
)
{
SSyncRespMgr
*
pObj
=
(
SSyncRespMgr
*
)
taosMemoryMalloc
(
sizeof
(
SSyncRespMgr
));
if
(
pObj
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
memset
(
pObj
,
0
,
sizeof
(
SSyncRespMgr
));
pObj
->
pRespHash
=
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
b71a9e34
...
...
@@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
SSyncSnapshotSender
*
pSender
=
NULL
;
if
(
condition
)
{
pSender
=
taosMemoryMalloc
(
sizeof
(
SSyncSnapshotSender
));
ASSERT
(
pSender
!=
NULL
);
if
(
pSender
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
memset
(
pSender
,
0
,
sizeof
(
*
pSender
));
pSender
->
start
=
false
;
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
b71a9e34
...
...
@@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint32_t
hostU32
=
taosGetIpv4FromFqdn
(
host
);
if
(
hostU32
==
(
uint32_t
)
-
1
)
{
sError
(
"Get IP address error"
);
sError
(
"failed to resolve ipv4 addr. host:%s"
,
host
);
terrno
=
TSDB_CODE_TSC_INVALID_FQDN
;
return
-
1
;
}
...
...
@@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
addEpIntoEpSet
(
pEpSet
,
host
,
port
);
}
void
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
)
{
bool
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
)
{
uint32_t
ipv4
=
taosGetIpv4FromFqdn
(
pNodeInfo
->
nodeFqdn
);
ASSERT
(
ipv4
!=
0xFFFFFFFF
);
if
(
ipv4
==
0xFFFFFFFF
||
ipv4
==
1
)
{
sError
(
"failed to resolve ipv4 addr. fqdn: %s"
,
pNodeInfo
->
nodeFqdn
);
terrno
=
TSDB_CODE_TSC_INVALID_FQDN
;
return
false
;
}
char
ipbuf
[
128
]
=
{
0
};
tinet_ntoa
(
ipbuf
,
ipv4
);
raftId
->
addr
=
syncUtilAddr2U64
(
ipbuf
,
pNodeInfo
->
nodePort
);
raftId
->
vgId
=
vgId
;
return
true
;
}
bool
syncUtilSameId
(
const
SRaftId
*
pId1
,
const
SRaftId
*
pId2
)
{
...
...
@@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) {
q
++
;
}
}
}
\ No newline at end of file
}
source/libs/sync/src/syncVoteMgr.c
浏览文件 @
b71a9e34
...
...
@@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
SVotesGranted
*
voteGrantedCreate
(
SSyncNode
*
pSyncNode
)
{
SVotesGranted
*
pVotesGranted
=
taosMemoryMalloc
(
sizeof
(
SVotesGranted
));
ASSERT
(
pVotesGranted
!=
NULL
);
if
(
pVotesGranted
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
memset
(
pVotesGranted
,
0
,
sizeof
(
SVotesGranted
));
pVotesGranted
->
replicas
=
&
(
pSyncNode
->
replicasId
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录