Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
50729f0c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
50729f0c
编写于
12月 21, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: remove assert
上级
2a7de0cd
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
24 addition
and
84 deletion
+24
-84
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+0
-9
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+12
-19
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+0
-19
source/libs/wal/src/walRef.c
source/libs/wal/src/walRef.c
+2
-2
source/libs/wal/src/walSeek.c
source/libs/wal/src/walSeek.c
+2
-5
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+8
-30
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
50729f0c
...
...
@@ -530,7 +530,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
int32_t
tmqCommitMsgImpl
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
int8_t
async
,
tmq_commit_cb
*
userCb
,
void
*
userParam
)
{
char
*
topic
;
int32_t
vgId
;
ASSERT
(
msg
!=
NULL
);
if
(
TD_RES_TMQ
(
msg
))
{
SMqRspObj
*
pRspObj
=
(
SMqRspObj
*
)
msg
;
topic
=
pRspObj
->
topic
;
...
...
@@ -809,8 +808,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
taosTmrReset
(
tmqAssignDelayedCommitTask
,
tmq
->
autoCommitInterval
,
pRefId
,
tmqMgmt
.
timer
,
&
tmq
->
commitTimer
);
}
else
if
(
*
pTaskType
==
TMQ_DELAYED_TASK__REPORT
)
{
}
else
{
ASSERT
(
0
);
}
taosFreeQitem
(
pTaskType
);
}
...
...
@@ -953,10 +950,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
const
char
*
user
=
conf
->
user
==
NULL
?
TSDB_DEFAULT_USER
:
conf
->
user
;
const
char
*
pass
=
conf
->
pass
==
NULL
?
TSDB_DEFAULT_PASS
:
conf
->
pass
;
ASSERT
(
user
);
ASSERT
(
pass
);
ASSERT
(
conf
->
groupId
[
0
]);
pTmq
->
clientTopics
=
taosArrayInit
(
0
,
sizeof
(
SMqClientTopic
));
pTmq
->
mqueue
=
taosOpenQueue
();
pTmq
->
qall
=
taosAllocateQall
();
...
...
@@ -1247,8 +1240,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeSTaosxRsp
(
&
decoder
,
&
pRspWrapper
->
taosxRsp
);
tDecoderClear
(
&
decoder
);
memcpy
(
&
pRspWrapper
->
taosxRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
}
else
{
ASSERT
(
0
);
}
taosMemoryFree
(
pMsg
->
pData
);
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
50729f0c
...
...
@@ -25,7 +25,7 @@ bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
}
bool
FORCE_INLINE
walIsEmpty
(
SWal
*
pWal
)
{
return
(
pWal
->
vers
.
firstVer
==
-
1
||
pWal
->
vers
.
lastVer
<
pWal
->
vers
.
firstVer
);
// [firstVer, lastVer + 1)
return
(
pWal
->
vers
.
firstVer
==
-
1
||
pWal
->
vers
.
lastVer
<
pWal
->
vers
.
firstVer
);
// [firstVer, lastVer + 1)
}
int64_t
FORCE_INLINE
walGetFirstVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
firstVer
;
}
...
...
@@ -49,7 +49,6 @@ static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
static
FORCE_INLINE
int64_t
walScanLogGetLastVer
(
SWal
*
pWal
,
int32_t
fileIdx
)
{
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
terrno
=
TSDB_CODE_SUCCESS
;
ASSERT
(
fileIdx
>=
0
&&
fileIdx
<
sz
);
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
fileIdx
);
char
fnameStr
[
WAL_FILE_LEN
];
...
...
@@ -101,7 +100,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
offsetBackward
=
offset
;
}
ASSERT
(
offset
<=
end
);
readSize
=
end
-
offset
;
capacity
=
readSize
+
sizeof
(
magic
);
...
...
@@ -257,7 +255,6 @@ static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
SWalFileInfo
*
pLogInfo
=
taosArrayGet
(
actualLogList
,
i
);
while
(
j
<
metaFileNum
)
{
SWalFileInfo
*
pMetaInfo
=
taosArrayGet
(
metaLogList
,
j
);
ASSERT
(
pMetaInfo
!=
NULL
);
if
(
pMetaInfo
->
firstVer
<
pLogInfo
->
firstVer
)
{
j
++
;
}
else
if
(
pMetaInfo
->
firstVer
==
pLogInfo
->
firstVer
)
{
...
...
@@ -385,7 +382,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayDestroy
(
actualLog
);
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
ASSERT
(
sz
==
actualFileNum
);
// scan and determine the lastVer
int32_t
fileIdx
=
sz
;
...
...
@@ -403,8 +399,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
return
-
1
;
}
ASSERT
(
pFileInfo
->
firstVer
>=
0
);
if
(
pFileInfo
->
lastVer
>=
pFileInfo
->
firstVer
&&
fileSize
==
pFileInfo
->
fileSize
)
{
totSize
+=
pFileInfo
->
fileSize
;
continue
;
...
...
@@ -417,7 +411,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
wError
(
"failed to scan wal last ver since %s"
,
terrstr
());
return
-
1
;
}
ASSERT
(
pFileInfo
->
fileSize
==
0
);
// remove the empty wal log, and its idx
wInfo
(
"vgId:%d, wal remove empty file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
taosRemoveFile
(
fnameStr
);
...
...
@@ -477,8 +470,7 @@ int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
}
int
walCheckAndRepairIdxFile
(
SWal
*
pWal
,
int32_t
fileIdx
)
{
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
ASSERT
(
fileIdx
>=
0
&&
fileIdx
<
sz
);
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
fileIdx
);
char
fnameStr
[
WAL_FILE_LEN
];
walBuildIdxName
(
pWal
,
pFileInfo
->
firstVer
,
fnameStr
);
...
...
@@ -492,7 +484,6 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
return
-
1
;
}
ASSERT
(
pFileInfo
->
fileSize
>
0
&&
pFileInfo
->
firstVer
>=
0
&&
pFileInfo
->
lastVer
>=
pFileInfo
->
firstVer
);
if
(
fileSize
==
(
pFileInfo
->
lastVer
-
pFileInfo
->
firstVer
+
1
)
*
sizeof
(
SWalIdxEntry
))
{
return
0
;
}
...
...
@@ -556,7 +547,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
}
offset
+=
sizeof
(
SWalIdxEntry
);
ASSERT
(
offset
==
(
idxEntry
.
ver
-
pFileInfo
->
firstVer
+
1
)
*
sizeof
(
SWalIdxEntry
));
/*A(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));*/
// ftruncate idx file
if
(
offset
<
fileSize
)
{
...
...
@@ -577,7 +568,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
}
while
(
idxEntry
.
ver
<
pFileInfo
->
lastVer
)
{
ASSERT
(
idxEntry
.
ver
==
ckHead
.
head
.
version
);
/*A(idxEntry.ver == ckHead.head.version);*/
idxEntry
.
ver
+=
1
;
idxEntry
.
offset
+=
sizeof
(
SWalCkHead
)
+
ckHead
.
head
.
bodyLen
;
...
...
@@ -649,8 +640,7 @@ int walRollFileInfo(SWal* pWal) {
}
char
*
walMetaSerialize
(
SWal
*
pWal
)
{
char
buf
[
30
];
ASSERT
(
pWal
->
fileInfoSet
);
char
buf
[
30
];
int
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pMeta
=
cJSON_CreateObject
();
...
...
@@ -707,7 +697,7 @@ char* walMetaSerialize(SWal* pWal) {
}
int
walMetaDeserialize
(
SWal
*
pWal
,
const
char
*
bytes
)
{
ASSERT
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
);
/*A(taosArrayGetSize(pWal->fileInfoSet) == 0);*/
cJSON
*
pRoot
,
*
pMeta
,
*
pFiles
,
*
pInfoJson
,
*
pField
;
pRoot
=
cJSON_Parse
(
bytes
);
if
(
!
pRoot
)
goto
_err
;
...
...
@@ -823,7 +813,9 @@ int walSaveMeta(SWal* pWal) {
// flush to a tmpfile
n
=
walBuildTmpMetaName
(
pWal
,
tmpFnameStr
);
ASSERT
(
n
<
sizeof
(
tmpFnameStr
)
&&
"Buffer overflow of file name"
);
if
(
n
<
sizeof
(
tmpFnameStr
))
{
return
-
1
;
}
TdFilePtr
pMetaFile
=
taosOpenFile
(
tmpFnameStr
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pMetaFile
==
NULL
)
{
...
...
@@ -854,7 +846,9 @@ int walSaveMeta(SWal* pWal) {
// rename it
n
=
walBuildMetaName
(
pWal
,
metaVer
+
1
,
fnameStr
);
ASSERT
(
n
<
sizeof
(
fnameStr
)
&&
"Buffer overflow of file name"
);
if
(
n
<
sizeof
(
fnameStr
))
{
goto
_err
;
}
if
(
taosRenameFile
(
tmpFnameStr
,
fnameStr
)
<
0
)
{
wError
(
"failed to rename file due to %s. dest:%s"
,
strerror
(
errno
),
fnameStr
);
...
...
@@ -877,7 +871,6 @@ _err:
}
int
walLoadMeta
(
SWal
*
pWal
)
{
ASSERT
(
pWal
->
fileInfoSet
->
size
==
0
);
// find existing meta file
int
metaVer
=
walFindCurMetaVer
(
pWal
);
if
(
metaVer
==
-
1
)
{
...
...
source/libs/wal/src/walRead.c
浏览文件 @
50729f0c
...
...
@@ -97,7 +97,6 @@ int32_t walNextValidMsg(SWalReader *pReader) {
return
-
1
;
}
fetchVer
++
;
ASSERT
(
fetchVer
==
pReader
->
curVersion
);
}
}
pReader
->
curStopped
=
1
;
...
...
@@ -132,7 +131,6 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
return
-
1
;
}
ASSERT
(
entry
.
ver
==
ver
);
ret
=
taosLSeekFile
(
pLogTFile
,
entry
.
offset
,
SEEK_SET
);
if
(
ret
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -241,7 +239,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if
(
pRead
->
curInvalid
||
pRead
->
curVersion
!=
fetchVer
)
{
if
(
walReadSeekVer
(
pRead
,
fetchVer
)
<
0
)
{
ASSERT
(
0
);
pRead
->
curVersion
=
fetchVer
;
pRead
->
curInvalid
=
1
;
return
-
1
;
...
...
@@ -262,7 +259,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
ASSERT
(
0
);
pRead
->
curInvalid
=
1
;
return
-
1
;
}
...
...
@@ -299,7 +295,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
pRead
->
curInvalid
=
1
;
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -308,7 +303,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
curInvalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -316,7 +310,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
curInvalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -328,14 +321,10 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
)
{
int64_t
code
;
ASSERT
(
pRead
->
curVersion
==
pRead
->
pHead
->
head
.
version
);
ASSERT
(
pRead
->
curInvalid
==
0
);
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pRead
->
curInvalid
=
1
;
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -384,7 +373,6 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
ASSERT
(
0
);
pRead
->
curInvalid
=
1
;
return
-
1
;
}
...
...
@@ -410,9 +398,6 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
pRead
->
pWal
->
cfg
.
vgId
,
pHead
->
head
.
version
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
commitVer
,
pRead
->
pWal
->
vers
.
lastVer
,
pRead
->
pWal
->
vers
.
appliedVer
);
ASSERT
(
pRead
->
curVersion
==
pHead
->
head
.
version
);
ASSERT
(
pRead
->
curInvalid
==
0
);
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -447,7 +432,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pRead
->
pLogFile
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
if
(
pReadHead
->
bodyLen
<
0
)
{
ASSERT
(
0
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
", since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
pReadHead
->
version
,
ver
,
tstrerror
(
terrno
));
...
...
@@ -457,12 +441,10 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
pRead
->
curInvalid
=
1
;
ASSERT
(
0
);
return
-
1
;
}
if
(
pReadHead
->
version
!=
ver
)
{
ASSERT
(
0
);
wError
(
"vgId:%d, wal fetch body error, index:%"
PRId64
", read request index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pReadHead
->
version
,
ver
);
pRead
->
curInvalid
=
1
;
...
...
@@ -471,7 +453,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
}
if
(
walValidBodyCksum
(
*
ppHead
)
!=
0
)
{
ASSERT
(
0
);
wError
(
"vgId:%d, wal fetch body error, index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
curInvalid
=
1
;
...
...
source/libs/wal/src/walRef.c
浏览文件 @
50729f0c
...
...
@@ -61,7 +61,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
/*A(pRet != NULL);*/
pRef
->
refFile
=
pRet
->
firstVer
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
...
...
@@ -92,7 +92,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
/*A(pRet != NULL);*/
pRef
->
refFile
=
pRet
->
firstVer
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
...
...
source/libs/wal/src/walSeek.c
浏览文件 @
50729f0c
...
...
@@ -40,7 +40,6 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
ASSERT(entry.ver == ver);
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
...
...
@@ -53,8 +52,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) {
int
walInitWriteFile
(
SWal
*
pWal
)
{
TdFilePtr
pIdxTFile
,
pLogTFile
;
SWalFileInfo
*
pRet
=
taosArrayGetLast
(
pWal
->
fileInfoSet
);
ASSERT
(
pRet
!=
NULL
);
int64_t
fileFirstVer
=
pRet
->
firstVer
;
int64_t
fileFirstVer
=
pRet
->
firstVer
;
char
fnameStr
[
WAL_FILE_LEN
];
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
...
...
@@ -109,9 +107,8 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
tmpInfo
.
firstVer
=
ver
;
// bsearch in fileSet
int32_t
idx
=
taosArraySearchIdx
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
idx
!=
-
1
);
/*A(idx != -1);*/
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
idx
);
/*ASSERT(pFileInfo != NULL);*/
int64_t
fileFirstVer
=
pFileInfo
->
firstVer
;
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
50729f0c
...
...
@@ -87,12 +87,10 @@ int32_t walApplyVer(SWal *pWal, int64_t ver) {
}
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
ASSERT
(
pWal
->
vers
.
commitVer
>=
pWal
->
vers
.
snapshotVer
);
ASSERT
(
pWal
->
vers
.
commitVer
<=
pWal
->
vers
.
lastVer
);
if
(
ver
<
pWal
->
vers
.
commitVer
)
{
return
0
;
}
if
(
ver
>
pWal
->
vers
.
lastVer
)
{
if
(
ver
>
pWal
->
vers
.
lastVer
||
pWal
->
vers
.
commitVer
<
pWal
->
vers
.
snapshotVer
)
{
terrno
=
TSDB_CODE_WAL_INVALID_VER
;
return
-
1
;
}
...
...
@@ -138,25 +136,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
TdFilePtr
pIdxFile
=
taosOpenFile
(
fnameStr
,
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_APPEND
);
if
(
pIdxFile
==
NULL
)
{
ASSERT
(
0
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
int64_t
idxOff
=
walGetVerIdxOffset
(
pWal
,
ver
);
code
=
taosLSeekFile
(
pIdxFile
,
idxOff
,
SEEK_SET
);
if
(
code
<
0
)
{
ASSERT
(
0
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
// read idx file and get log file pos
SWalIdxEntry
entry
;
if
(
taosReadFile
(
pIdxFile
,
&
entry
,
sizeof
(
SWalIdxEntry
))
!=
sizeof
(
SWalIdxEntry
))
{
ASSERT
(
0
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
ASSERT
(
entry
.
ver
==
ver
);
walBuildLogName
(
pWal
,
walGetCurFileFirstVer
(
pWal
),
fnameStr
);
TdFilePtr
pLogFile
=
taosOpenFile
(
fnameStr
,
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_APPEND
);
...
...
@@ -176,24 +170,19 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
// validate offset
SWalCkHead
head
;
ASSERT
(
taosValidFile
(
pLogFile
));
int64_t
size
=
taosReadFile
(
pLogFile
,
&
head
,
sizeof
(
SWalCkHead
));
int64_t
size
=
taosReadFile
(
pLogFile
,
&
head
,
sizeof
(
SWalCkHead
));
if
(
size
!=
sizeof
(
SWalCkHead
))
{
ASSERT
(
0
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
code
=
walValidHeadCksum
(
&
head
);
ASSERT
(
code
==
0
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
if
(
head
.
head
.
version
!=
ver
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
...
...
@@ -202,22 +191,22 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// truncate old files
code
=
taosFtruncateFile
(
pLogFile
,
entry
.
offset
);
if
(
code
<
0
)
{
ASSERT
(
0
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
code
=
taosFtruncateFile
(
pIdxFile
,
idxOff
);
if
(
code
<
0
)
{
ASSERT
(
0
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
pWal
->
vers
.
lastVer
=
ver
-
1
;
#if 0
if (pWal->vers.lastVer < pWal->vers.firstVer) {
A
SSERT
(
pWal
->
vers
.
lastVer
==
pWal
->
vers
.
firstVer
-
1
);
A(pWal->vers.lastVer == pWal->vers.firstVer - 1);
}
#endif
((
SWalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
))
->
lastVer
=
ver
-
1
;
((
SWalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
))
->
fileSize
=
entry
.
offset
;
taosCloseFile
(
&
pIdxFile
);
...
...
@@ -386,7 +375,8 @@ int32_t walEndSnapshot(SWal *pWal) {
walBuildIdxName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
wDebug
(
"vgId:%d, wal remove file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
if
(
taosRemoveFile
(
fnameStr
)
<
0
&&
errno
!=
ENOENT
)
{
ASSERT
(
0
);
wError
(
"vgId:%d, failed to remove idx file %s due to %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
,
strerror
(
errno
));
goto
END
;
}
}
taosArrayClear
(
pWal
->
toDeleteFiles
);
...
...
@@ -441,7 +431,6 @@ int32_t walRollImpl(SWal *pWal) {
pWal
->
pIdxFile
=
pIdxFile
;
pWal
->
pLogFile
=
pLogFile
;
pWal
->
writeCur
=
taosArrayGetSize
(
pWal
->
fileInfoSet
)
-
1
;
ASSERT
(
pWal
->
writeCur
>=
0
);
pWal
->
lastRollSeq
=
walGetSeq
();
...
...
@@ -458,8 +447,7 @@ END:
static
int32_t
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
SWalIdxEntry
entry
=
{.
ver
=
ver
,
.
offset
=
offset
};
SWalFileInfo
*
pFileInfo
=
walGetCurFileInfo
(
pWal
);
ASSERT
(
pFileInfo
!=
NULL
);
ASSERT
(
pFileInfo
->
firstVer
>=
0
);
int64_t
idxOffset
=
(
entry
.
ver
-
pFileInfo
->
firstVer
)
*
sizeof
(
SWalIdxEntry
);
wDebug
(
"vgId:%d, write index, index:%"
PRId64
", offset:%"
PRId64
", at %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
offset
,
idxOffset
);
...
...
@@ -476,7 +464,6 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
if
(
endOffset
<
0
)
{
wFatal
(
"vgId:%d, failed to seek end of idxfile due to %s. ver:%"
PRId64
""
,
pWal
->
cfg
.
vgId
,
strerror
(
errno
),
ver
);
}
ASSERT
(
endOffset
==
idxOffset
+
sizeof
(
SWalIdxEntry
)
&&
"Offset of idx entries misaligned"
);
return
0
;
}
...
...
@@ -486,9 +473,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
int64_t
offset
=
walGetCurFileOffset
(
pWal
);
SWalFileInfo
*
pFileInfo
=
walGetCurFileInfo
(
pWal
);
ASSERT
(
pFileInfo
!=
NULL
);
ASSERT
(
pFileInfo
->
firstVer
!=
-
1
);
pWal
->
writeHead
.
head
.
version
=
index
;
pWal
->
writeHead
.
head
.
bodyLen
=
bodyLen
;
pWal
->
writeHead
.
head
.
msgType
=
msgType
;
...
...
@@ -525,7 +510,6 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
// set status
if
(
pWal
->
vers
.
firstVer
==
-
1
)
{
ASSERT
(
index
==
0
);
pWal
->
vers
.
firstVer
=
0
;
}
pWal
->
vers
.
lastVer
=
index
;
...
...
@@ -541,7 +525,6 @@ END:
wFatal
(
"vgId:%d, failed to ftruncate logfile to offset:%"
PRId64
" during recovery due to %s"
,
pWal
->
cfg
.
vgId
,
offset
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
ASSERT
(
0
&&
"failed to recover from error"
);
}
int64_t
idxOffset
=
(
index
-
pFileInfo
->
firstVer
)
*
sizeof
(
SWalIdxEntry
);
...
...
@@ -549,7 +532,6 @@ END:
wFatal
(
"vgId:%d, failed to ftruncate idxfile to offset:%"
PRId64
"during recovery due to %s"
,
pWal
->
cfg
.
vgId
,
idxOffset
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
ASSERT
(
0
&&
"failed to recover from error"
);
}
return
-
1
;
}
...
...
@@ -576,8 +558,6 @@ int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syn
}
}
ASSERT
(
pWal
->
pLogFile
!=
NULL
&&
pWal
->
pIdxFile
!=
NULL
&&
pWal
->
writeCur
>=
0
);
if
(
walWriteImpl
(
pWal
,
index
,
msgType
,
syncMeta
,
body
,
bodyLen
)
<
0
)
{
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
...
...
@@ -614,8 +594,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync
}
}
ASSERT
(
pWal
->
pIdxFile
!=
NULL
&&
pWal
->
pLogFile
!=
NULL
&&
pWal
->
writeCur
>=
0
);
if
(
walWriteImpl
(
pWal
,
index
,
msgType
,
syncMeta
,
body
,
bodyLen
)
<
0
)
{
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录