Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e45367f9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e45367f9
编写于
2月 22, 2023
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: resolve coverity scan issues in sync and wal
上级
77240500
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
36 addition
and
419 deletion
+36
-419
source/libs/sync/inc/syncPipeline.h
source/libs/sync/inc/syncPipeline.h
+1
-1
source/libs/sync/inc/syncRaftEntry.h
source/libs/sync/inc/syncRaftEntry.h
+0
-33
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+5
-3
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+12
-24
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+1
-5
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+1
-1
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+0
-341
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+13
-8
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+1
-1
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+1
-1
未找到文件。
source/libs/sync/inc/syncPipeline.h
浏览文件 @
e45367f9
...
@@ -68,7 +68,7 @@ void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
...
@@ -68,7 +68,7 @@ void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
// access
// access
static
FORCE_INLINE
int64_t
syncLogGetRetryBackoffTimeMs
(
SSyncLogReplMgr
*
pMgr
)
{
static
FORCE_INLINE
int64_t
syncLogGetRetryBackoffTimeMs
(
SSyncLogReplMgr
*
pMgr
)
{
return
(
1
<<
pMgr
->
retryBackoff
)
*
SYNC_LOG_REPL_RETRY_WAIT_MS
;
return
(
(
int64_t
)
1
<<
pMgr
->
retryBackoff
)
*
SYNC_LOG_REPL_RETRY_WAIT_MS
;
}
}
static
FORCE_INLINE
int32_t
syncLogGetNextRetryBackoff
(
SSyncLogReplMgr
*
pMgr
)
{
static
FORCE_INLINE
int32_t
syncLogGetNextRetryBackoff
(
SSyncLogReplMgr
*
pMgr
)
{
...
...
source/libs/sync/inc/syncRaftEntry.h
浏览文件 @
e45367f9
...
@@ -49,39 +49,6 @@ static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
...
@@ -49,39 +49,6 @@ static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
return
pEntry
->
originalRpcType
==
TDMT_SYNC_NOOP
;
return
pEntry
->
originalRpcType
==
TDMT_SYNC_NOOP
;
}
}
typedef
struct
SRaftEntryHashCache
{
SHashObj
*
pEntryHash
;
int32_t
maxCount
;
int32_t
currentCount
;
TdThreadMutex
mutex
;
SSyncNode
*
pSyncNode
;
}
SRaftEntryHashCache
;
SRaftEntryHashCache
*
raftCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
);
void
raftCacheDestroy
(
SRaftEntryHashCache
*
pCache
);
int32_t
raftCachePutEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SSyncRaftEntry
*
pEntry
);
int32_t
raftCacheGetEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheGetEntryP
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheDelEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
);
int32_t
raftCacheGetAndDel
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftCacheClear
(
struct
SRaftEntryHashCache
*
pCache
);
typedef
struct
SRaftEntryCache
{
SSkipList
*
pSkipList
;
int32_t
maxCount
;
int32_t
currentCount
;
int32_t
refMgr
;
TdThreadMutex
mutex
;
SSyncNode
*
pSyncNode
;
}
SRaftEntryCache
;
SRaftEntryCache
*
raftEntryCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
);
void
raftEntryCacheDestroy
(
SRaftEntryCache
*
pCache
);
int32_t
raftEntryCachePutEntry
(
struct
SRaftEntryCache
*
pCache
,
SSyncRaftEntry
*
pEntry
);
int32_t
raftEntryCacheGetEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftEntryCacheGetEntryP
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
raftEntryCacheClear
(
struct
SRaftEntryCache
*
pCache
,
int32_t
count
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
e45367f9
...
@@ -104,6 +104,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
...
@@ -104,6 +104,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
SRpcMsg
rpcRsp
=
{
0
};
SRpcMsg
rpcRsp
=
{
0
};
bool
accepted
=
false
;
bool
accepted
=
false
;
SSyncRaftEntry
*
pEntry
=
NULL
;
// if already drop replica, do not process
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"not in my config"
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"not in my config"
);
...
@@ -137,14 +139,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
...
@@ -137,14 +139,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown
(
ths
,
pMsg
->
term
);
syncNodeStepDown
(
ths
,
pMsg
->
term
);
syncNodeResetElectTimer
(
ths
);
syncNodeResetElectTimer
(
ths
);
if
(
pMsg
->
dataLen
<
(
int32_t
)
sizeof
(
SSyncRaftEntry
))
{
if
(
pMsg
->
dataLen
<
sizeof
(
SSyncRaftEntry
))
{
sError
(
"vgId:%d, incomplete append entries received. prev index:%"
PRId64
", term:%"
PRId64
", datalen:%d"
,
sError
(
"vgId:%d, incomplete append entries received. prev index:%"
PRId64
", term:%"
PRId64
", datalen:%d"
,
ths
->
vgId
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
ths
->
vgId
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
goto
_IGNORE
;
goto
_IGNORE
;
}
}
SSyncRaftEntry
*
pEntry
=
syncBuildRaftEntryFromAppendEntries
(
pMsg
);
pEntry
=
syncBuildRaftEntryFromAppendEntries
(
pMsg
);
if
(
pEntry
==
NULL
)
{
if
(
pEntry
==
NULL
)
{
sError
(
"vgId:%d, failed to get raft entry from append entries since %s"
,
ths
->
vgId
,
terrstr
());
sError
(
"vgId:%d, failed to get raft entry from append entries since %s"
,
ths
->
vgId
,
terrstr
());
goto
_IGNORE
;
goto
_IGNORE
;
...
@@ -191,5 +192,6 @@ _out:
...
@@ -191,5 +192,6 @@ _out:
_IGNORE:
_IGNORE:
rpcFreeCont
(
rpcRsp
.
pCont
);
rpcFreeCont
(
rpcRsp
.
pCont
);
syncEntryDestroy
(
pEntry
);
return
0
;
return
0
;
}
}
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
e45367f9
...
@@ -40,7 +40,7 @@
...
@@ -40,7 +40,7 @@
//
//
int32_t
syncNodeOnAppendEntriesReply
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
int32_t
syncNodeOnAppendEntriesReply
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
SyncAppendEntriesReply
*
pMsg
=
pRpcMsg
->
pCont
;
SyncAppendEntriesReply
*
pMsg
=
(
SyncAppendEntriesReply
*
)
pRpcMsg
->
pCont
;
int32_t
ret
=
0
;
int32_t
ret
=
0
;
// if already drop replica, do not process
// if already drop replica, do not process
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
e45367f9
...
@@ -1126,29 +1126,18 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
...
@@ -1126,29 +1126,18 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
}
void
syncNodePreClose
(
SSyncNode
*
pSyncNode
)
{
void
syncNodePreClose
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pFsm
!=
NULL
&&
pSyncNode
->
pFsm
->
FpApplyQueueItems
!=
NULL
)
{
ASSERT
(
pSyncNode
!=
NULL
);
while
(
1
)
{
ASSERT
(
pSyncNode
->
pFsm
!=
NULL
);
int32_t
aqItems
=
pSyncNode
->
pFsm
->
FpApplyQueueItems
(
pSyncNode
->
pFsm
);
ASSERT
(
pSyncNode
->
pFsm
->
FpApplyQueueItems
!=
NULL
);
sTrace
(
"vgId:%d, pre close, %d items in apply queue"
,
pSyncNode
->
vgId
,
aqItems
);
if
(
aqItems
==
0
||
aqItems
==
-
1
)
{
while
(
1
)
{
break
;
int32_t
aqItems
=
pSyncNode
->
pFsm
->
FpApplyQueueItems
(
pSyncNode
->
pFsm
);
}
sTrace
(
"vgId:%d, pre close, %d items in apply queue"
,
pSyncNode
->
vgId
,
aqItems
);
taosMsleep
(
20
);
if
(
aqItems
==
0
||
aqItems
==
-
1
)
{
}
break
;
}
#if 0
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
}
}
taosMsleep
(
20
);
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
pSyncNode->pNewNodeReceiver);
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
}
#endif
// stop elect timer
// stop elect timer
syncNodeStopElectTimer
(
pSyncNode
);
syncNodeStopElectTimer
(
pSyncNode
);
...
@@ -1461,7 +1450,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
...
@@ -1461,7 +1450,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
}
// log begin config change
// log begin config change
sNInfo
(
pSyncNode
,
"begin do config change, from %d to %d"
,
pSyncNode
->
vgId
,
oldConfig
.
replicaNum
,
sNInfo
(
pSyncNode
,
"begin do config change, from %d to %d
, replicas:%d
"
,
pSyncNode
->
vgId
,
oldConfig
.
replicaNum
,
pNewConfig
->
replicaNum
);
pNewConfig
->
replicaNum
);
if
(
IamInNew
)
{
if
(
IamInNew
)
{
...
@@ -1742,8 +1731,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
...
@@ -1742,8 +1731,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
#endif
#endif
// close receiver
// close receiver
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pNewNodeReceiver
!=
NULL
&&
if
(
snapshotReceiverIsStart
(
pSyncNode
->
pNewNodeReceiver
))
{
snapshotReceiverIsStart
(
pSyncNode
->
pNewNodeReceiver
))
{
snapshotReceiverStop
(
pSyncNode
->
pNewNodeReceiver
);
snapshotReceiverStop
(
pSyncNode
->
pNewNodeReceiver
);
}
}
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
e45367f9
...
@@ -901,7 +901,7 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo
...
@@ -901,7 +901,7 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo
int64_t
firstSentMs
=
pMgr
->
states
[
pMgr
->
startIndex
%
pMgr
->
size
].
timeMs
;
int64_t
firstSentMs
=
pMgr
->
states
[
pMgr
->
startIndex
%
pMgr
->
size
].
timeMs
;
int64_t
lastSentMs
=
pMgr
->
states
[(
pMgr
->
endIndex
-
1
)
%
pMgr
->
size
].
timeMs
;
int64_t
lastSentMs
=
pMgr
->
states
[(
pMgr
->
endIndex
-
1
)
%
pMgr
->
size
].
timeMs
;
int64_t
timeDiffMs
=
lastSentMs
-
firstSentMs
;
int64_t
timeDiffMs
=
lastSentMs
-
firstSentMs
;
if
(
timeDiffMs
>
0
&&
timeDiffMs
<
(
SYNC_LOG_REPL_RETRY_WAIT_MS
<<
(
pMgr
->
retryBackoff
-
1
)))
{
if
(
timeDiffMs
>
0
&&
timeDiffMs
<
(
(
int64_t
)
SYNC_LOG_REPL_RETRY_WAIT_MS
<<
(
pMgr
->
retryBackoff
-
1
)))
{
pMgr
->
retryBackoff
-=
1
;
pMgr
->
retryBackoff
-=
1
;
}
}
}
}
...
@@ -928,10 +928,6 @@ SSyncLogReplMgr* syncLogReplMgrCreate() {
...
@@ -928,10 +928,6 @@ SSyncLogReplMgr* syncLogReplMgrCreate() {
ASSERT
(
pMgr
->
size
==
TSDB_SYNC_LOG_BUFFER_SIZE
);
ASSERT
(
pMgr
->
size
==
TSDB_SYNC_LOG_BUFFER_SIZE
);
return
pMgr
;
return
pMgr
;
_err:
taosMemoryFree
(
pMgr
);
return
NULL
;
}
}
void
syncLogReplMgrDestroy
(
SSyncLogReplMgr
*
pMgr
)
{
void
syncLogReplMgrDestroy
(
SSyncLogReplMgr
*
pMgr
)
{
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
e45367f9
...
@@ -224,7 +224,7 @@ _OVER:
...
@@ -224,7 +224,7 @@ _OVER:
int32_t
syncAddCfgIndex
(
SSyncNode
*
pNode
,
SyncIndex
cfgIndex
)
{
int32_t
syncAddCfgIndex
(
SSyncNode
*
pNode
,
SyncIndex
cfgIndex
)
{
SRaftCfg
*
pCfg
=
&
pNode
->
raftCfg
;
SRaftCfg
*
pCfg
=
&
pNode
->
raftCfg
;
if
(
pCfg
->
configIndexCount
<
=
MAX_CONFIG_INDEX_COUNT
)
{
if
(
pCfg
->
configIndexCount
<
MAX_CONFIG_INDEX_COUNT
)
{
return
-
1
;
return
-
1
;
}
}
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
e45367f9
...
@@ -102,344 +102,3 @@ void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
...
@@ -102,344 +102,3 @@ void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
memcpy
(
pRpcMsg
->
pCont
,
pEntry
->
data
,
pRpcMsg
->
contLen
);
memcpy
(
pRpcMsg
->
pCont
,
pEntry
->
data
,
pRpcMsg
->
contLen
);
}
}
SRaftEntryHashCache
*
raftCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
)
{
SRaftEntryHashCache
*
pCache
=
taosMemoryMalloc
(
sizeof
(
SRaftEntryHashCache
));
if
(
pCache
==
NULL
)
{
sError
(
"vgId:%d, raft cache create error"
,
pSyncNode
->
vgId
);
return
NULL
;
}
pCache
->
pEntryHash
=
taosHashInit
(
sizeof
(
SyncIndex
),
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
if
(
pCache
->
pEntryHash
==
NULL
)
{
sError
(
"vgId:%d, raft cache create hash error"
,
pSyncNode
->
vgId
);
return
NULL
;
}
taosThreadMutexInit
(
&
(
pCache
->
mutex
),
NULL
);
pCache
->
maxCount
=
maxCount
;
pCache
->
currentCount
=
0
;
pCache
->
pSyncNode
=
pSyncNode
;
return
pCache
;
}
void
raftCacheDestroy
(
SRaftEntryHashCache
*
pCache
)
{
if
(
pCache
!=
NULL
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
taosHashCleanup
(
pCache
->
pEntryHash
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
taosThreadMutexDestroy
(
&
(
pCache
->
mutex
));
taosMemoryFree
(
pCache
);
}
}
// success, return 1
// max count, return 0
// error, return -1
int32_t
raftCachePutEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SSyncRaftEntry
*
pEntry
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
if
(
pCache
->
currentCount
>=
pCache
->
maxCount
)
{
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
0
;
}
taosHashPut
(
pCache
->
pEntryHash
,
&
(
pEntry
->
index
),
sizeof
(
pEntry
->
index
),
pEntry
,
pEntry
->
bytes
);
++
(
pCache
->
currentCount
);
sNTrace
(
pCache
->
pSyncNode
,
"raft cache add, type:%s,%d, type2:%s,%d, index:%"
PRId64
", bytes:%d"
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
,
pEntry
->
index
,
pEntry
->
bytes
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
1
;
}
// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
int32_t
raftCacheGetEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
if
(
ppEntry
==
NULL
)
{
return
-
1
;
}
*
ppEntry
=
NULL
;
taosThreadMutexLock
(
&
pCache
->
mutex
);
void
*
pTmp
=
taosHashGet
(
pCache
->
pEntryHash
,
&
index
,
sizeof
(
index
));
if
(
pTmp
!=
NULL
)
{
SSyncRaftEntry
*
pEntry
=
pTmp
;
*
ppEntry
=
taosMemoryMalloc
(
pEntry
->
bytes
);
memcpy
(
*
ppEntry
,
pTmp
,
pEntry
->
bytes
);
sNTrace
(
pCache
->
pSyncNode
,
"raft cache get, type:%s,%d, type2:%s,%d, index:%"
PRId64
,
TMSG_INFO
((
*
ppEntry
)
->
msgType
),
(
*
ppEntry
)
->
msgType
,
TMSG_INFO
((
*
ppEntry
)
->
originalRpcType
),
(
*
ppEntry
)
->
originalRpcType
,
(
*
ppEntry
)
->
index
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
0
;
}
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
int32_t
raftCacheGetEntryP
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
if
(
ppEntry
==
NULL
)
{
return
-
1
;
}
*
ppEntry
=
NULL
;
taosThreadMutexLock
(
&
pCache
->
mutex
);
void
*
pTmp
=
taosHashGet
(
pCache
->
pEntryHash
,
&
index
,
sizeof
(
index
));
if
(
pTmp
!=
NULL
)
{
SSyncRaftEntry
*
pEntry
=
pTmp
;
*
ppEntry
=
pEntry
;
sNTrace
(
pCache
->
pSyncNode
,
"raft cache get, type:%s,%d, type2:%s,%d, index:%"
PRId64
,
TMSG_INFO
((
*
ppEntry
)
->
msgType
),
(
*
ppEntry
)
->
msgType
,
TMSG_INFO
((
*
ppEntry
)
->
originalRpcType
),
(
*
ppEntry
)
->
originalRpcType
,
(
*
ppEntry
)
->
index
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
0
;
}
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
int32_t
raftCacheDelEntry
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
taosHashRemove
(
pCache
->
pEntryHash
,
&
index
,
sizeof
(
index
));
--
(
pCache
->
currentCount
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
0
;
}
int32_t
raftCacheGetAndDel
(
struct
SRaftEntryHashCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
if
(
ppEntry
==
NULL
)
{
return
-
1
;
}
*
ppEntry
=
NULL
;
taosThreadMutexLock
(
&
pCache
->
mutex
);
void
*
pTmp
=
taosHashGet
(
pCache
->
pEntryHash
,
&
index
,
sizeof
(
index
));
if
(
pTmp
!=
NULL
)
{
SSyncRaftEntry
*
pEntry
=
pTmp
;
*
ppEntry
=
taosMemoryMalloc
(
pEntry
->
bytes
);
memcpy
(
*
ppEntry
,
pTmp
,
pEntry
->
bytes
);
sNTrace
(
pCache
->
pSyncNode
,
"raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%"
PRId64
,
TMSG_INFO
((
*
ppEntry
)
->
msgType
),
(
*
ppEntry
)
->
msgType
,
TMSG_INFO
((
*
ppEntry
)
->
originalRpcType
),
(
*
ppEntry
)
->
originalRpcType
,
(
*
ppEntry
)
->
index
);
taosHashRemove
(
pCache
->
pEntryHash
,
&
index
,
sizeof
(
index
));
--
(
pCache
->
currentCount
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
0
;
}
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
int32_t
raftCacheClear
(
struct
SRaftEntryHashCache
*
pCache
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
taosHashClear
(
pCache
->
pEntryHash
);
pCache
->
currentCount
=
0
;
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
0
;
}
static
char
*
keyFn
(
const
void
*
pData
)
{
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
pData
;
return
(
char
*
)(
&
(
pEntry
->
index
));
}
static
int
cmpFn
(
const
void
*
p1
,
const
void
*
p2
)
{
return
memcmp
(
p1
,
p2
,
sizeof
(
SyncIndex
));
}
static
void
freeRaftEntry
(
void
*
param
)
{
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
param
;
syncEntryDestroy
(
pEntry
);
}
SRaftEntryCache
*
raftEntryCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
)
{
SRaftEntryCache
*
pCache
=
taosMemoryMalloc
(
sizeof
(
SRaftEntryCache
));
if
(
pCache
==
NULL
)
{
sError
(
"vgId:%d, raft cache create error"
,
pSyncNode
->
vgId
);
return
NULL
;
}
pCache
->
pSkipList
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
sizeof
(
SyncIndex
),
cmpFn
,
SL_ALLOW_DUP_KEY
,
keyFn
);
if
(
pCache
->
pSkipList
==
NULL
)
{
sError
(
"vgId:%d, raft cache create hash error"
,
pSyncNode
->
vgId
);
return
NULL
;
}
taosThreadMutexInit
(
&
(
pCache
->
mutex
),
NULL
);
pCache
->
refMgr
=
taosOpenRef
(
10
,
freeRaftEntry
);
pCache
->
maxCount
=
maxCount
;
pCache
->
currentCount
=
0
;
pCache
->
pSyncNode
=
pSyncNode
;
return
pCache
;
}
void
raftEntryCacheDestroy
(
SRaftEntryCache
*
pCache
)
{
if
(
pCache
!=
NULL
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
tSkipListDestroy
(
pCache
->
pSkipList
);
if
(
pCache
->
refMgr
!=
-
1
)
{
taosCloseRef
(
pCache
->
refMgr
);
pCache
->
refMgr
=
-
1
;
}
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
taosThreadMutexDestroy
(
&
(
pCache
->
mutex
));
taosMemoryFree
(
pCache
);
}
}
// success, return 1
// max count, return 0
// error, return -1
int32_t
raftEntryCachePutEntry
(
struct
SRaftEntryCache
*
pCache
,
SSyncRaftEntry
*
pEntry
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
if
(
pCache
->
currentCount
>=
pCache
->
maxCount
)
{
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
0
;
}
SSkipListNode
*
pSkipListNode
=
tSkipListPut
(
pCache
->
pSkipList
,
pEntry
);
ASSERT
(
pSkipListNode
!=
NULL
);
++
(
pCache
->
currentCount
);
pEntry
->
rid
=
taosAddRef
(
pCache
->
refMgr
,
pEntry
);
ASSERT
(
pEntry
->
rid
>=
0
);
sNTrace
(
pCache
->
pSyncNode
,
"raft cache add, type:%s,%d, type2:%s,%d, index:%"
PRId64
", bytes:%d"
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
,
pEntry
->
index
,
pEntry
->
bytes
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
1
;
}
// find one, return 1
// not found, return 0
// error, return -1
int32_t
raftEntryCacheGetEntry
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
ASSERT
(
ppEntry
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
raftEntryCacheGetEntryP
(
pCache
,
index
,
&
pEntry
);
if
(
code
==
1
)
{
int32_t
bytes
=
(
int32_t
)
pEntry
->
bytes
;
*
ppEntry
=
taosMemoryMalloc
((
int64_t
)
bytes
);
memcpy
(
*
ppEntry
,
pEntry
,
pEntry
->
bytes
);
(
*
ppEntry
)
->
rid
=
-
1
;
}
else
{
*
ppEntry
=
NULL
;
}
return
code
;
}
// find one, return 1
// not found, return 0
// error, return -1
int32_t
raftEntryCacheGetEntryP
(
struct
SRaftEntryCache
*
pCache
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
SyncIndex
index2
=
index
;
int32_t
code
=
0
;
SArray
*
entryPArray
=
tSkipListGet
(
pCache
->
pSkipList
,
(
char
*
)(
&
index2
));
int32_t
arraySize
=
taosArrayGetSize
(
entryPArray
);
if
(
arraySize
==
1
)
{
SSkipListNode
**
ppNode
=
(
SSkipListNode
**
)
taosArrayGet
(
entryPArray
,
0
);
ASSERT
(
*
ppNode
!=
NULL
);
*
ppEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
*
ppNode
);
taosAcquireRef
(
pCache
->
refMgr
,
(
*
ppEntry
)
->
rid
);
code
=
1
;
}
else
if
(
arraySize
==
0
)
{
code
=
0
;
}
else
{
ASSERT
(
0
);
code
=
-
1
;
}
taosArrayDestroy
(
entryPArray
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
code
;
}
// count = -1, clear all
// count >= 0, clear count
// return -1, error
// return delete count
int32_t
raftEntryCacheClear
(
struct
SRaftEntryCache
*
pCache
,
int32_t
count
)
{
taosThreadMutexLock
(
&
pCache
->
mutex
);
int32_t
returnCnt
=
0
;
if
(
count
==
-
1
)
{
// clear all
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pCache
->
pSkipList
);
while
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
syncEntryDestroy
(
pEntry
);
++
returnCnt
;
}
tSkipListDestroyIter
(
pIter
);
tSkipListDestroy
(
pCache
->
pSkipList
);
pCache
->
pSkipList
=
tSkipListCreate
(
MAX_SKIP_LIST_LEVEL
,
TSDB_DATA_TYPE_BINARY
,
sizeof
(
SyncIndex
),
cmpFn
,
SL_ALLOW_DUP_KEY
,
keyFn
);
ASSERT
(
pCache
->
pSkipList
!=
NULL
);
}
else
{
// clear count
int
i
=
0
;
SSkipListIterator
*
pIter
=
tSkipListCreateIter
(
pCache
->
pSkipList
);
SArray
*
delNodeArray
=
taosArrayInit
(
0
,
sizeof
(
SSkipListNode
*
));
// free entry
while
(
tSkipListIterNext
(
pIter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
if
(
i
++
>=
count
)
{
break
;
}
// sDebug("push pNode:%p", pNode);
taosArrayPush
(
delNodeArray
,
&
pNode
);
++
returnCnt
;
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
// syncEntryDestroy(pEntry);
taosRemoveRef
(
pCache
->
refMgr
,
pEntry
->
rid
);
}
tSkipListDestroyIter
(
pIter
);
// delete skiplist node
int32_t
arraySize
=
taosArrayGetSize
(
delNodeArray
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SSkipListNode
**
ppNode
=
taosArrayGet
(
delNodeArray
,
i
);
// sDebug("get pNode:%p", *ppNode);
tSkipListRemoveNode
(
pCache
->
pSkipList
,
*
ppNode
);
}
taosArrayDestroy
(
delNodeArray
);
}
pCache
->
currentCount
-=
returnCnt
;
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
return
returnCnt
;
}
source/libs/sync/src/syncSnapshot.c
浏览文件 @
e45367f9
...
@@ -168,17 +168,19 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
...
@@ -168,17 +168,19 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if
(
pSender
->
blockLen
>
0
)
{
if
(
pSender
->
blockLen
>
0
)
{
// has read data
// has read data
sSDebug
(
pSender
,
"snapshot sender continue to read, blockLen:%d seq:%d"
,
pSender
->
blockLen
,
pSender
->
seq
);
sSDebug
(
pSender
,
"vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d"
,
pSender
->
pSyncNode
->
vgId
,
pSender
->
blockLen
,
pSender
->
seq
);
}
else
{
}
else
{
// read finish, update seq to end
// read finish, update seq to end
pSender
->
seq
=
SYNC_SNAPSHOT_SEQ_END
;
pSender
->
seq
=
SYNC_SNAPSHOT_SEQ_END
;
sSInfo
(
pSender
,
"snapshot sender read to the end, blockLen:%d seq:%d"
,
pSender
->
blockLen
,
pSender
->
seq
);
sSInfo
(
pSender
,
"vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d"
,
pSender
->
pSyncNode
->
vgId
,
pSender
->
blockLen
,
pSender
->
seq
);
}
}
// build msg
// build msg
SRpcMsg
rpcMsg
=
{
0
};
SRpcMsg
rpcMsg
=
{
0
};
if
(
syncBuildSnapshotSend
(
&
rpcMsg
,
pSender
->
blockLen
,
pSender
->
pSyncNode
->
vgId
)
!=
0
)
{
if
(
syncBuildSnapshotSend
(
&
rpcMsg
,
pSender
->
blockLen
,
pSender
->
pSyncNode
->
vgId
)
!=
0
)
{
sSError
(
pSender
,
"snapshot sender build msg failed since %s"
,
pSender
->
pSyncNode
->
vgId
,
terrstr
());
sSError
(
pSender
,
"
vgId:%d,
snapshot sender build msg failed since %s"
,
pSender
->
pSyncNode
->
vgId
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
...
@@ -340,11 +342,13 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
...
@@ -340,11 +342,13 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
taosMemoryFree
(
pReceiver
);
taosMemoryFree
(
pReceiver
);
}
}
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
)
{
return
pReceiver
->
start
;
}
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
)
{
return
(
pReceiver
!=
NULL
?
pReceiver
->
start
:
false
);
}
static
int32_t
snapshotReceiverStartWriter
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
)
{
static
int32_t
snapshotReceiverStartWriter
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
)
{
if
(
pReceiver
->
pWriter
!=
NULL
)
{
if
(
pReceiver
->
pWriter
!=
NULL
)
{
sRError
(
pReceiver
,
"vgId:%d, snapshot receiver writer is not null"
);
sRError
(
pReceiver
,
"vgId:%d, snapshot receiver writer is not null"
,
pReceiver
->
pSyncNode
->
vgId
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
return
-
1
;
}
}
...
@@ -851,8 +855,8 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
...
@@ -851,8 +855,8 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
pMsg
->
snapBeginIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
);
pMsg
->
snapBeginIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
);
if
(
pMsg
->
snapBeginIndex
>
snapshot
.
lastApplyIndex
)
{
if
(
pMsg
->
snapBeginIndex
>
snapshot
.
lastApplyIndex
)
{
sSError
(
pSender
,
"prepare snapshot failed since beginIndex:%
d larger than applyIndex:%d"
,
pMsg
->
snapBeginIndex
,
sSError
(
pSender
,
"prepare snapshot failed since beginIndex:%
"
PRId64
" larger than applyIndex:%"
PRId64
,
snapshot
.
lastApplyIndex
);
pMsg
->
snapBeginIndex
,
snapshot
.
lastApplyIndex
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
return
-
1
;
}
}
...
@@ -966,7 +970,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
...
@@ -966,7 +970,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if
(
pSender
->
pReader
==
NULL
||
pSender
->
finish
)
{
if
(
pSender
->
pReader
==
NULL
||
pSender
->
finish
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"snapshot sender invalid"
);
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"snapshot sender invalid"
);
sSError
(
pSender
,
"snapshot sender invalid, pReader:%p finish:%d"
,
pMsg
->
code
,
pSender
->
pReader
,
pSender
->
finish
);
sSError
(
pSender
,
"snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d"
,
tstrerror
(
pMsg
->
code
),
pMsg
->
code
,
pSender
->
pReader
,
pSender
->
finish
);
terrno
=
pMsg
->
code
;
terrno
=
pMsg
->
code
;
goto
_ERROR
;
goto
_ERROR
;
}
}
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
e45367f9
...
@@ -913,7 +913,7 @@ int walLoadMeta(SWal* pWal) {
...
@@ -913,7 +913,7 @@ int walLoadMeta(SWal* pWal) {
int64_t
fileSize
=
0
;
int64_t
fileSize
=
0
;
taosStatFile
(
fnameStr
,
&
fileSize
,
NULL
);
taosStatFile
(
fnameStr
,
&
fileSize
,
NULL
);
if
(
fileSize
==
0
)
{
if
(
fileSize
==
0
)
{
taosRemoveFile
(
fnameStr
);
(
void
)
taosRemoveFile
(
fnameStr
);
wDebug
(
"vgId:%d, wal find empty meta ver %d"
,
pWal
->
cfg
.
vgId
,
metaVer
);
wDebug
(
"vgId:%d, wal find empty meta ver %d"
,
pWal
->
cfg
.
vgId
,
metaVer
);
return
-
1
;
return
-
1
;
}
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
e45367f9
...
@@ -63,7 +63,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
...
@@ -63,7 +63,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
wInfo
(
"vgId:%d, restore from snapshot, remove file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
wInfo
(
"vgId:%d, restore from snapshot, remove file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
}
}
}
}
walRemoveMeta
(
pWal
);
(
void
)
walRemoveMeta
(
pWal
);
pWal
->
writeCur
=
-
1
;
pWal
->
writeCur
=
-
1
;
pWal
->
totSize
=
0
;
pWal
->
totSize
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录