Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ff17763e
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ff17763e
编写于
8月 15, 2023
作者:
H
Haojun Liao
提交者:
GitHub
8月 15, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22417 from taosdata/mark/tmq
opti:wal logic
上级
776b32ae
dd0bc0e0
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
42 addition
and
171 deletion
+42
-171
include/libs/wal/wal.h
include/libs/wal/wal.h
+3
-5
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-1
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+20
-27
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+2
-11
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+16
-127
未找到文件。
include/libs/wal/wal.h
浏览文件 @
ff17763e
...
...
@@ -153,7 +153,6 @@ struct SWalReader {
int64_t
capacity
;
TdThreadMutex
mutex
;
SWalFilterCond
cond
;
// TODO remove it
SWalCkHead
*
pHead
;
};
...
...
@@ -207,10 +206,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64
void
walReaderVerifyOffset
(
SWalReader
*
pWalReader
,
STqOffsetVal
*
pOffset
);
// only for tq usage
void
walSetReaderCapacity
(
SWalReader
*
pRead
,
int32_t
capacity
);
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
);
int32_t
walFetchBody
(
SWalReader
*
pRead
,
SWalCkHead
**
ppHead
);
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
);
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
);
int32_t
walFetchBody
(
SWalReader
*
pRead
);
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
);
void
walRefFirstVer
(
SWal
*
,
SWalRef
*
);
void
walRefLastVer
(
SWal
*
,
SWalRef
*
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
ff17763e
...
...
@@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data);
// tqRead
int32_t
tqScanTaosx
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosxRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
offset
);
int32_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
);
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
,
uint64_t
reqId
);
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
uint64_t
reqId
);
// tqExec
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SPackedData
submit
,
STaosxRsp
*
pRsp
,
int32_t
*
totalRows
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
ff17763e
...
...
@@ -184,70 +184,63 @@ end:
return
tbSuid
==
realTbSuid
;
}
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
ppCkHead
,
uint64_t
reqId
)
{
int32_t
code
=
0
;
int32_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
uint64_t
reqId
)
{
int32_t
code
=
-
1
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
taosThreadMutexLock
(
&
pHandle
->
pWalReader
->
mutex
);
int64_t
offset
=
*
fetchOffset
;
int64_t
lastVer
=
walGetLastVer
(
pHandle
->
pWalReader
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pHandle
->
pWalReader
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pHandle
->
pWalReader
->
pWal
);
while
(
1
)
{
if
(
walFetchHead
(
pHandle
->
pWalReader
,
offset
,
*
ppCkHead
)
<
0
)
{
wDebug
(
"vgId:%d, wal start to fetch, index:%"
PRId64
", last index:%"
PRId64
" commit index:%"
PRId64
", applied index:%"
PRId64
,
vgId
,
offset
,
lastVer
,
committedVer
,
appliedVer
);
while
(
offset
<=
appliedVer
)
{
if
(
walFetchHead
(
pHandle
->
pWalReader
,
offset
)
<
0
)
{
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", (epoch %d) vgId:%d offset %"
PRId64
", no more log to return, reqId:0x%"
PRIx64
,
pHandle
->
consumerId
,
pHandle
->
epoch
,
vgId
,
offset
,
reqId
);
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
tqDebug
(
"vgId:%d, consumer:0x%"
PRIx64
" taosx get msg ver %"
PRId64
", type: %s, reqId:0x%"
PRIx64
,
vgId
,
pHandle
->
consumerId
,
offset
,
TMSG_INFO
((
*
ppCkHead
)
->
head
.
msgType
),
reqId
);
if
((
*
ppCkHead
)
->
head
.
msgType
==
TDMT_VND_SUBMIT
)
{
code
=
walFetchBody
(
pHandle
->
pWalReader
,
ppCkHead
);
pHandle
->
consumerId
,
offset
,
TMSG_INFO
(
pHandle
->
pWalReader
->
pHead
->
head
.
msgType
),
reqId
);
if
(
code
<
0
)
{
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
*
fetchOffset
=
offset
;
code
=
0
;
if
(
pHandle
->
pWalReader
->
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
)
{
code
=
walFetchBody
(
pHandle
->
pWalReader
);
goto
END
;
}
else
{
if
(
pHandle
->
fetchMeta
!=
WITH_DATA
)
{
SWalCont
*
pHead
=
&
(
(
*
ppCkHead
)
->
head
);
SWalCont
*
pHead
=
&
(
pHandle
->
pWalReader
->
pHead
->
head
);
if
(
IS_META_MSG
(
pHead
->
msgType
)
&&
!
(
pHead
->
msgType
==
TDMT_VND_DELETE
&&
pHandle
->
fetchMeta
==
ONLY_META
))
{
code
=
walFetchBody
(
pHandle
->
pWalReader
,
ppCkHead
);
code
=
walFetchBody
(
pHandle
->
pWalReader
);
if
(
code
<
0
)
{
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
pHead
=
&
(
pHandle
->
pWalReader
->
pHead
->
head
);
if
(
isValValidForTable
(
pHandle
,
pHead
))
{
*
fetchOffset
=
offset
;
code
=
0
;
goto
END
;
}
else
{
offset
++
;
code
=
-
1
;
continue
;
}
}
}
code
=
walSkipFetchBody
(
pHandle
->
pWalReader
,
*
ppCkHead
);
code
=
walSkipFetchBody
(
pHandle
->
pWalReader
);
if
(
code
<
0
)
{
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
}
offset
++
;
}
code
=
-
1
;
}
END:
taosThreadMutexUnlock
(
&
pHandle
->
pWalReader
->
mutex
)
;
*
fetchOffset
=
offset
;
return
code
;
}
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
ff17763e
...
...
@@ -179,7 +179,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
offset
)
{
int
code
=
0
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
SWalCkHead
*
pCkHead
=
NULL
;
SMqMetaRsp
metaRsp
=
{
0
};
STaosxRsp
taosxRsp
=
{
0
};
tqInitTaosxRsp
(
&
taosxRsp
,
*
offset
);
...
...
@@ -216,14 +215,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if
(
offset
->
type
==
TMQ_OFFSET__LOG
)
{
walReaderVerifyOffset
(
pHandle
->
pWalReader
,
offset
);
int64_t
fetchVer
=
offset
->
version
;
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
end
;
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
int
totalRows
=
0
;
while
(
1
)
{
int32_t
savedEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
...
...
@@ -234,14 +226,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
break
;
}
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
&
pCkHead
,
pRequest
->
reqId
)
<
0
)
{
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
pRequest
->
reqId
)
<
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
// setRequestVersion(&taosxRsp.reqOffset, offset->version);
code
=
tqSendDataRsp
(
pHandle
,
pMsg
,
pRequest
,
(
SMqDataRsp
*
)
&
taosxRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
goto
end
;
}
SWalCont
*
pHead
=
&
p
Ck
Head
->
head
;
SWalCont
*
pHead
=
&
p
Handle
->
pWalReader
->
p
Head
->
head
;
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
" (epoch %d) iter log, vgId:%d offset %"
PRId64
" msgType %d"
,
pRequest
->
consumerId
,
pRequest
->
epoch
,
vgId
,
fetchVer
,
pHead
->
msgType
);
...
...
@@ -291,7 +283,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
end:
tDeleteSTaosxRsp
(
&
taosxRsp
);
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
ff17763e
...
...
@@ -16,10 +16,6 @@
#include "taoserror.h"
#include "walInt.h"
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
);
static
int32_t
walFetchBodyNew
(
SWalReader
*
pRead
);
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
);
SWalReader
*
walOpenReader
(
SWal
*
pWal
,
SWalFilterCond
*
cond
)
{
SWalReader
*
pReader
=
taosMemoryCalloc
(
1
,
sizeof
(
SWalReader
));
if
(
pReader
==
NULL
)
{
...
...
@@ -80,19 +76,19 @@ int32_t walNextValidMsg(SWalReader *pReader) {
return
-
1
;
}
while
(
fetchVer
<=
appliedVer
)
{
if
(
walFetchHead
New
(
pReader
,
fetchVer
)
<
0
)
{
if
(
walFetchHead
(
pReader
,
fetchVer
)
<
0
)
{
return
-
1
;
}
int32_t
type
=
pReader
->
pHead
->
head
.
msgType
;
if
(
type
==
TDMT_VND_SUBMIT
||
((
type
==
TDMT_VND_DELETE
)
&&
(
pReader
->
cond
.
deleteMsg
==
1
))
||
(
IS_META_MSG
(
type
)
&&
pReader
->
cond
.
scanMeta
))
{
if
(
walFetchBody
New
(
pReader
)
<
0
)
{
if
(
walFetchBody
(
pReader
)
<
0
)
{
return
-
1
;
}
return
0
;
}
else
{
if
(
walSkipFetchBody
New
(
pReader
)
<
0
)
{
if
(
walSkipFetchBody
(
pReader
)
<
0
)
{
return
-
1
;
}
...
...
@@ -254,104 +250,8 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
return
0
;
}
void
walSetReaderCapacity
(
SWalReader
*
pRead
,
int32_t
capacity
)
{
pRead
->
capacity
=
capacity
;
}
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
)
{
int64_t
contLen
;
bool
seeked
=
false
;
wDebug
(
"vgId:%d, wal starts to fetch head, index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
fetchVer
);
if
(
pRead
->
curVersion
!=
fetchVer
)
{
if
(
walReaderSeekVer
(
pRead
,
fetchVer
)
<
0
)
{
return
-
1
;
}
seeked
=
true
;
}
while
(
1
)
{
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pRead
->
pHead
,
sizeof
(
SWalCkHead
));
if
(
contLen
==
sizeof
(
SWalCkHead
))
{
break
;
}
else
if
(
contLen
==
0
&&
!
seeked
)
{
if
(
walReadSeekVerImpl
(
pRead
,
fetchVer
)
<
0
){
return
-
1
;
}
seeked
=
true
;
continue
;
}
else
{
if
(
contLen
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
return
-
1
;
}
}
// pRead->curInvalid = 0;
return
0
;
}
static
int32_t
walFetchBodyNew
(
SWalReader
*
pReader
)
{
SWalCont
*
pReadHead
=
&
pReader
->
pHead
->
head
;
int64_t
ver
=
pReadHead
->
version
;
wDebug
(
"vgId:%d, wal starts to fetch body, ver:%"
PRId64
" ,len:%d, total"
,
pReader
->
pWal
->
cfg
.
vgId
,
ver
,
pReadHead
->
bodyLen
);
if
(
pReader
->
capacity
<
pReadHead
->
bodyLen
)
{
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
pReader
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pReader
->
pHead
=
ptr
;
pReadHead
=
&
pReader
->
pHead
->
head
;
pReader
->
capacity
=
pReadHead
->
bodyLen
;
}
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pReader
->
pLogFile
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
if
(
pReadHead
->
bodyLen
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
", since %s"
,
pReader
->
pWal
->
cfg
.
vgId
,
pReader
->
pHead
->
head
.
version
,
ver
,
tstrerror
(
terrno
));
}
else
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
", since file corrupted"
,
pReader
->
pWal
->
cfg
.
vgId
,
pReader
->
pHead
->
head
.
version
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
return
-
1
;
}
if
(
walValidBodyCksum
(
pReader
->
pHead
)
!=
0
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", since body checksum not passed"
,
pReader
->
pWal
->
cfg
.
vgId
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
wDebug
(
"vgId:%d, index:%"
PRId64
" is fetched, type:%d, cursor advance"
,
pReader
->
pWal
->
cfg
.
vgId
,
ver
,
pReader
->
pHead
->
head
.
msgType
);
pReader
->
curVersion
=
ver
+
1
;
return
0
;
}
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
)
{
int64_t
code
;
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
// pRead->curInvalid = 1;
return
-
1
;
}
pRead
->
curVersion
++
;
wDebug
(
"vgId:%d, version advance to %"
PRId64
", skip fetch"
,
pRead
->
pWal
->
cfg
.
vgId
,
pRead
->
curVersion
);
return
0
;
}
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
)
{
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
)
{
int64_t
code
;
int64_t
contLen
;
bool
seeked
=
false
;
...
...
@@ -369,15 +269,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
if
(
pRead
->
curVersion
!=
ver
)
{
code
=
walReaderSeekVer
(
pRead
,
ver
);
if
(
code
<
0
)
{
// pRead->curVersion = ver;
// pRead->curInvalid = 1;
return
-
1
;
}
seeked
=
true
;
}
while
(
1
)
{
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pHead
,
sizeof
(
SWalCkHead
));
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
p
Read
->
p
Head
,
sizeof
(
SWalCkHead
));
if
(
contLen
==
sizeof
(
SWalCkHead
))
{
break
;
}
else
if
(
contLen
==
0
&&
!
seeked
)
{
...
...
@@ -392,12 +290,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
// pRead->curInvalid = 1;
return
-
1
;
}
}
code
=
walValidHeadCksum
(
pHead
);
code
=
walValidHeadCksum
(
p
Read
->
p
Head
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
...
...
@@ -405,32 +302,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return
-
1
;
}
// pRead->curInvalid = 0;
return
0
;
}
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
)
{
int64_t
code
;
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
)
{
wDebug
(
"vgId:%d, skip fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
", applied ver:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pHead
->
head
.
version
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
cfg
.
vgId
,
p
Read
->
p
Head
->
head
.
version
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
int64_t
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
// pRead->curInvalid = 1;
return
-
1
;
}
pRead
->
curVersion
++
;
return
0
;
}
int32_t
walFetchBody
(
SWalReader
*
pRead
,
SWalCkHead
**
ppHead
)
{
SWalCont
*
pReadHead
=
&
((
*
ppHead
)
->
head
)
;
int32_t
walFetchBody
(
SWalReader
*
pRead
)
{
SWalCont
*
pReadHead
=
&
pRead
->
pHead
->
head
;
int64_t
ver
=
pReadHead
->
version
;
wDebug
(
"vgId:%d, fetch body %"
PRId64
", first ver:%"
PRId64
", commit ver:%"
PRId64
", last ver:%"
PRId64
...
...
@@ -439,13 +331,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead
->
pWal
->
vers
.
appliedVer
);
if
(
pRead
->
capacity
<
pReadHead
->
bodyLen
)
{
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
*
p
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
SWalCkHead
*
ptr
=
(
SWalCkHead
*
)
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
*
p
pHead
=
ptr
;
pReadHead
=
&
((
*
ppHead
)
->
head
)
;
pRead
->
pHead
=
ptr
;
pReadHead
=
&
pRead
->
pHead
->
head
;
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
...
...
@@ -459,27 +351,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead
->
pWal
->
cfg
.
vgId
,
pReadHead
->
version
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
// pRead->curInvalid = 1;
return
-
1
;
}
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"vgId:%d, wal fetch body error, index:%"
PRId64
", read request index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pReadHead
->
version
,
ver
);
// pRead->curInvalid = 1;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
walValidBodyCksum
(
*
p
pHead
)
!=
0
)
{
if
(
walValidBodyCksum
(
pRead
->
pHead
)
!=
0
)
{
wError
(
"vgId:%d, wal fetch body error, index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
// pRead->curInvalid = 1;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
pRead
->
curVersion
=
ver
+
1
;
pRead
->
curVersion
++
;
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录