Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5d49e4f5
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
5d49e4f5
编写于
12月 20, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
12月 20, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19038 from taosdata/fix/TD-21175
refact: adjust some sync log and func return type
上级
a366cb4b
9c8b194c
变更
15
显示空白变更内容
内联
并排
Showing
15 changed file
with
39 addition
and
60 deletion
+39
-60
include/libs/sync/sync.h
include/libs/sync/sync.h
+2
-2
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+3
-4
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+1
-1
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+1
-4
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+3
-3
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSnapshot.c
source/dnode/vnode/src/vnd/vnodeSnapshot.c
+2
-5
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+4
-6
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+6
-13
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+6
-7
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+3
-7
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+3
-3
未找到文件。
include/libs/sync/sync.h
浏览文件 @
5d49e4f5
...
...
@@ -153,10 +153,10 @@ typedef struct SSyncFSM {
void
(
*
FpBecomeFollowerCb
)(
const
struct
SSyncFSM
*
pFsm
);
int32_t
(
*
FpGetSnapshot
)(
const
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
,
void
*
pReaderParam
,
void
**
ppReader
);
int32_t
(
*
FpGetSnapshotInfo
)(
const
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
void
(
*
FpGetSnapshotInfo
)(
const
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
int32_t
(
*
FpSnapshotStartRead
)(
const
struct
SSyncFSM
*
pFsm
,
void
*
pReaderParam
,
void
**
ppReader
);
int32_t
(
*
FpSnapshotStopRead
)(
const
struct
SSyncFSM
*
pFsm
,
void
*
pReader
);
void
(
*
FpSnapshotStopRead
)(
const
struct
SSyncFSM
*
pFsm
,
void
*
pReader
);
int32_t
(
*
FpSnapshotDoRead
)(
const
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
);
int32_t
(
*
FpSnapshotStartWrite
)(
const
struct
SSyncFSM
*
pFsm
,
void
*
pWriterParam
,
void
**
ppWriter
);
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
5d49e4f5
...
...
@@ -142,10 +142,9 @@ int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pRe
return
0
;
}
int32_t
mndSyncGetSnapshotInfo
(
const
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
static
void
mndSyncGetSnapshotInfo
(
const
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
sdbGetCommitInfo
(
pMnode
->
pSdb
,
&
pSnapshot
->
lastApplyIndex
,
&
pSnapshot
->
lastApplyTerm
,
&
pSnapshot
->
lastConfigIndex
);
return
0
;
}
void
mndRestoreFinish
(
const
SSyncFSM
*
pFsm
)
{
...
...
@@ -170,10 +169,10 @@ int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader
return
sdbStartRead
(
pMnode
->
pSdb
,
(
SSdbIter
**
)
ppReader
,
NULL
,
NULL
,
NULL
);
}
int32_t
mndSnapshotStopRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
static
void
mndSnapshotStopRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
mInfo
(
"stop to read snapshot from sdb"
);
SMnode
*
pMnode
=
pFsm
->
data
;
return
sdbStopRead
(
pMnode
->
pSdb
,
pReader
);
sdbStopRead
(
pMnode
->
pSdb
,
pReader
);
}
int32_t
mndSnapshotDoRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
...
...
source/dnode/mnode/sdb/inc/sdb.h
浏览文件 @
5d49e4f5
...
...
@@ -392,7 +392,7 @@ void *sdbGetRowObj(SSdbRow *pRow);
void
sdbFreeRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
,
bool
callFunc
);
int32_t
sdbStartRead
(
SSdb
*
pSdb
,
SSdbIter
**
ppIter
,
int64_t
*
index
,
int64_t
*
term
,
int64_t
*
config
);
int32_t
sdbStopRead
(
SSdb
*
pSdb
,
SSdbIter
*
pIter
);
void
sdbStopRead
(
SSdb
*
pSdb
,
SSdbIter
*
pIter
);
int32_t
sdbDoRead
(
SSdb
*
pSdb
,
SSdbIter
*
pIter
,
void
**
ppBuf
,
int32_t
*
len
);
int32_t
sdbStartWrite
(
SSdb
*
pSdb
,
SSdbIter
**
ppIter
);
...
...
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
5d49e4f5
...
...
@@ -585,10 +585,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
return
0
;
}
int32_t
sdbStopRead
(
SSdb
*
pSdb
,
SSdbIter
*
pIter
)
{
sdbCloseIter
(
pIter
);
return
0
;
}
void
sdbStopRead
(
SSdb
*
pSdb
,
SSdbIter
*
pIter
)
{
sdbCloseIter
(
pIter
);
}
int32_t
sdbDoRead
(
SSdb
*
pSdb
,
SSdbIter
*
pIter
,
void
**
ppBuf
,
int32_t
*
len
)
{
int32_t
maxlen
=
4096
;
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
5d49e4f5
...
...
@@ -264,7 +264,7 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *
// SVSnapReader
int32_t
vnodeSnapReaderOpen
(
SVnode
*
pVnode
,
int64_t
sver
,
int64_t
ever
,
SVSnapReader
**
ppReader
);
int32_t
vnodeSnapReaderClose
(
SVSnapReader
*
pReader
);
void
vnodeSnapReaderClose
(
SVSnapReader
*
pReader
);
int32_t
vnodeSnapRead
(
SVSnapReader
*
pReader
,
uint8_t
**
ppData
,
uint32_t
*
nData
);
// SVSnapWriter
int32_t
vnodeSnapWriterOpen
(
SVnode
*
pVnode
,
int64_t
sver
,
int64_t
ever
,
SVSnapWriter
**
ppWriter
);
...
...
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
5d49e4f5
...
...
@@ -1370,7 +1370,7 @@ _exit:
taosMemoryFree
(
pWriter
);
}
}
else
{
tsdbInfo
(
"vgId:%d %s done"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
);
tsdbInfo
(
"vgId:%d
,
%s done"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
);
*
ppWriter
=
pWriter
;
}
return
code
;
...
...
@@ -1391,7 +1391,7 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
,
tstrerror
(
code
));
tsdbError
(
"vgId:%d
,
%s failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
,
tstrerror
(
code
));
}
return
code
;
}
...
...
@@ -1442,7 +1442,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
(
pWriter
->
aBuf
)
/
sizeof
(
uint8_t
*
);
iBuf
++
)
{
tFree
(
pWriter
->
aBuf
[
iBuf
]);
}
tsdbInfo
(
"vgId:%d %s done"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
);
tsdbInfo
(
"vgId:%d
,
%s done"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
__func__
);
taosMemoryFree
(
pWriter
);
*
ppWriter
=
NULL
;
return
code
;
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
5d49e4f5
...
...
@@ -234,10 +234,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
_exit:
if
(
code
)
{
vError
(
"vgId:%d %s failed since %s, commit id:%"
PRId64
,
TD_VID
(
pVnode
),
__func__
,
tstrerror
(
code
),
vError
(
"vgId:%d
,
%s failed since %s, commit id:%"
PRId64
,
TD_VID
(
pVnode
),
__func__
,
tstrerror
(
code
),
pVnode
->
state
.
commitID
);
}
else
{
vDebug
(
"vgId:%d %s done"
,
TD_VID
(
pVnode
),
__func__
);
vDebug
(
"vgId:%d
,
%s done"
,
TD_VID
(
pVnode
),
__func__
);
}
return
code
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSnapshot.c
浏览文件 @
5d49e4f5
...
...
@@ -67,9 +67,8 @@ _err:
return
code
;
}
int32_t
vnodeSnapReaderClose
(
SVSnapReader
*
pReader
)
{
int32_t
code
=
0
;
void
vnodeSnapReaderClose
(
SVSnapReader
*
pReader
)
{
vInfo
(
"vgId:%d, close vnode snapshot reader"
,
TD_VID
(
pReader
->
pVnode
));
if
(
pReader
->
pRsmaReader
)
{
rsmaSnapReaderClose
(
&
pReader
->
pRsmaReader
);
}
...
...
@@ -82,9 +81,7 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
metaSnapReaderClose
(
&
pReader
->
pMetaReader
);
}
vInfo
(
"vgId:%d, vnode snapshot reader closed"
,
TD_VID
(
pReader
->
pVnode
));
taosMemoryFree
(
pReader
);
return
code
;
}
int32_t
vnodeSnapRead
(
SVSnapReader
*
pReader
,
uint8_t
**
ppData
,
uint32_t
*
nData
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
5d49e4f5
...
...
@@ -1189,7 +1189,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
SSingleDeleteReq
*
pOneReq
=
taosArrayGet
(
deleteReq
.
deleteReqs
,
i
);
char
*
name
=
pOneReq
->
tbname
;
if
(
metaGetTableEntryByName
(
&
mr
,
name
)
<
0
)
{
vDebug
(
"
stream delete msg, skip vgId:%d
since no table: %s"
,
pVnode
->
config
.
vgId
,
name
);
vDebug
(
"
vgId:%d, stream delete msg, skip
since no table: %s"
,
pVnode
->
config
.
vgId
,
name
);
continue
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
5d49e4f5
...
...
@@ -380,9 +380,8 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return
code
;
}
static
int32_t
vnodeSyncGetSnapshot
(
const
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
static
void
vnodeSyncGetSnapshotInfo
(
const
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
vnodeGetSnapshot
(
pFsm
->
data
,
pSnapshot
);
return
0
;
}
static
int32_t
vnodeSyncApplyMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
...
...
@@ -424,10 +423,9 @@ static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void *
return
code
;
}
static
int32_t
vnodeSnapshotStopRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
static
void
vnodeSnapshotStopRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
int32_t
code
=
vnodeSnapReaderClose
(
pReader
);
return
code
;
vnodeSnapReaderClose
(
pReader
);
}
static
int32_t
vnodeSnapshotDoRead
(
const
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
...
...
@@ -539,7 +537,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm
->
FpCommitCb
=
vnodeSyncCommitMsg
;
pFsm
->
FpPreCommitCb
=
vnodeSyncPreCommitMsg
;
pFsm
->
FpRollBackCb
=
vnodeSyncRollBackMsg
;
pFsm
->
FpGetSnapshotInfo
=
vnodeSyncGetSnapshot
;
pFsm
->
FpGetSnapshotInfo
=
vnodeSyncGetSnapshot
Info
;
pFsm
->
FpRestoreFinishCb
=
vnodeRestoreFinish
;
pFsm
->
FpLeaderTransferCb
=
NULL
;
pFsm
->
FpApplyQueueEmptyCb
=
vnodeApplyQueueEmpty
;
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
5d49e4f5
...
...
@@ -64,7 +64,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
sTrace
(
"vgId:%d received append entries reply. srcId:0x%016"
PRIx64
", term:%"
PRId64
", matchIndex:%"
PRId64
""
,
sTrace
(
"vgId:%d
,
received append entries reply. srcId:0x%016"
PRIx64
", term:%"
PRId64
", matchIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
term
,
pMsg
->
matchIndex
);
if
(
pMsg
->
success
)
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
5d49e4f5
...
...
@@ -815,11 +815,9 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
ASSERTS
(
pNode
->
pLogStore
!=
NULL
,
"log store not created"
);
ASSERTS
(
pNode
->
pFsm
!=
NULL
,
"pFsm not registered"
);
ASSERTS
(
pNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
,
"FpGetSnapshotInfo not registered"
);
SSnapshot
snapshot
;
if
(
pNode
->
pFsm
->
FpGetSnapshotInfo
(
pNode
->
pFsm
,
&
snapshot
)
<
0
)
{
sError
(
"vgId:%d, failed to get snapshot info since %s"
,
pNode
->
vgId
,
terrstr
());
return
-
1
;
}
SSnapshot
snapshot
=
{
0
};
pNode
->
pFsm
->
FpGetSnapshotInfo
(
pNode
->
pFsm
,
&
snapshot
);
SyncIndex
commitIndex
=
snapshot
.
lastApplyIndex
;
SyncIndex
firstVer
=
pNode
->
pLogStore
->
syncLogBeginIndex
(
pNode
->
pLogStore
);
SyncIndex
lastVer
=
pNode
->
pLogStore
->
syncLogLastIndex
(
pNode
->
pLogStore
);
...
...
@@ -1029,11 +1027,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
SyncIndex
commitIndex
=
SYNC_INDEX_INVALID
;
if
(
pSyncNode
->
pFsm
!=
NULL
&&
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
SSnapshot
snapshot
=
{
0
};
int32_t
code
=
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d, failed to get snapshot info, code:%d"
,
pSyncNode
->
vgId
,
code
);
goto
_error
;
}
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>
commitIndex
)
{
commitIndex
=
snapshot
.
lastApplyIndex
;
sNTrace
(
pSyncNode
,
"reset commit index by snapshot"
);
...
...
@@ -1155,9 +1149,8 @@ _error:
void
syncNodeMaybeUpdateCommitBySnapshot
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
pFsm
!=
NULL
&&
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
SSnapshot
snapshot
;
int32_t
code
=
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
ASSERT
(
code
==
0
);
SSnapshot
snapshot
=
{
0
};
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>
pSyncNode
->
commitIndex
)
{
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
}
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
5d49e4f5
...
...
@@ -99,8 +99,9 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
return
prevLogTerm
;
}
SSnapshot
snapshot
;
if
(
pNode
->
pFsm
->
FpGetSnapshotInfo
(
pNode
->
pFsm
,
&
snapshot
)
==
0
&&
prevIndex
==
snapshot
.
lastApplyIndex
)
{
SSnapshot
snapshot
=
{
0
};
pNode
->
pFsm
->
FpGetSnapshotInfo
(
pNode
->
pFsm
,
&
snapshot
);
if
(
prevIndex
==
snapshot
.
lastApplyIndex
)
{
return
snapshot
.
lastApplyTerm
;
}
...
...
@@ -145,11 +146,9 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERTS
(
pNode
->
pFsm
!=
NULL
,
"pFsm not registered"
);
ASSERTS
(
pNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
,
"FpGetSnapshotInfo not registered"
);
SSnapshot
snapshot
;
if
(
pNode
->
pFsm
->
FpGetSnapshotInfo
(
pNode
->
pFsm
,
&
snapshot
)
<
0
)
{
sError
(
"vgId:%d, failed to get snapshot info since %s"
,
pNode
->
vgId
,
terrstr
());
goto
_err
;
}
SSnapshot
snapshot
=
{
0
};
pNode
->
pFsm
->
FpGetSnapshotInfo
(
pNode
->
pFsm
,
&
snapshot
);
SyncIndex
commitIndex
=
snapshot
.
lastApplyIndex
;
SyncTerm
commitTerm
=
TMAX
(
snapshot
.
lastApplyTerm
,
0
);
if
(
syncLogValidateAlignmentOfCommit
(
pNode
,
commitIndex
))
{
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
5d49e4f5
...
...
@@ -47,7 +47,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pSender
->
startTime
=
0
;
pSender
->
endTime
=
0
;
pSender
->
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSender
->
pSyncNode
->
pFsm
,
&
(
pSender
->
snapshot
)
);
pSender
->
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSender
->
pSyncNode
->
pFsm
,
&
pSender
->
snapshot
);
pSender
->
finish
=
false
;
}
else
{
sError
(
"vgId:%d, cannot create snapshot sender"
,
pSyncNode
->
vgId
);
...
...
@@ -66,10 +66,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
// close reader
if
(
pSender
->
pReader
!=
NULL
)
{
int32_t
ret
=
pSender
->
pSyncNode
->
pFsm
->
FpSnapshotStopRead
(
pSender
->
pSyncNode
->
pFsm
,
pSender
->
pReader
);
if
(
ret
!=
0
)
{
sNError
(
pSender
->
pSyncNode
,
"stop reader error"
);
}
pSender
->
pSyncNode
->
pFsm
->
FpSnapshotStopRead
(
pSender
->
pSyncNode
->
pFsm
,
pSender
->
pReader
);
pSender
->
pReader
=
NULL
;
}
...
...
@@ -139,8 +136,7 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// close reader
if
(
pSender
->
pReader
!=
NULL
)
{
int32_t
ret
=
pSender
->
pSyncNode
->
pFsm
->
FpSnapshotStopRead
(
pSender
->
pSyncNode
->
pFsm
,
pSender
->
pReader
);
ASSERT
(
ret
==
0
);
pSender
->
pSyncNode
->
pFsm
->
FpSnapshotStopRead
(
pSender
->
pSyncNode
->
pFsm
,
pSender
->
pReader
);
pSender
->
pReader
=
NULL
;
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
5d49e4f5
...
...
@@ -350,7 +350,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t
contLen
;
bool
seeked
=
false
;
wDebug
(
"vgId:%d try to fetch ver %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
wDebug
(
"vgId:%d
,
try to fetch ver %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
...
...
@@ -405,7 +405,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
)
{
int64_t
code
;
wDebug
(
"vgId:%d skip fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
wDebug
(
"vgId:%d
,
skip fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pHead
->
head
.
version
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
...
...
@@ -429,7 +429,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont
*
pReadHead
=
&
((
*
ppHead
)
->
head
);
int64_t
ver
=
pReadHead
->
version
;
wDebug
(
"vgId:%d fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
wDebug
(
"vgId:%d
,
fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录