Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cf93f4c2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cf93f4c2
编写于
12月 09, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add unit test for wal
上级
0237a2ba
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
661 addition
and
253 deletion
+661
-253
include/libs/wal/wal.h
include/libs/wal/wal.h
+24
-11
include/util/tfile.h
include/util/tfile.h
+2
-0
source/libs/wal/CMakeLists.txt
source/libs/wal/CMakeLists.txt
+5
-0
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+65
-1
source/libs/wal/src/walIndex.c
source/libs/wal/src/walIndex.c
+13
-14
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+193
-0
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+21
-32
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+44
-4
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+117
-54
source/libs/wal/test/CMakeLists.txt
source/libs/wal/test/CMakeLists.txt
+20
-0
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+157
-0
source/libs/wal/test/walTests.cpp
source/libs/wal/test/walTests.cpp
+0
-137
未找到文件。
include/libs/wal/wal.h
浏览文件 @
cf93f4c2
...
...
@@ -55,12 +55,14 @@ typedef struct {
uint32_t
signature
;
uint32_t
cksumHead
;
uint32_t
cksumBody
;
//
char cont[];
char
cont
[];
}
SWalHead
;
typedef
struct
{
int32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
int32_t
rollPeriod
;
int64_t
segSize
;
EWalType
walLevel
;
// wal level
}
SWalCfg
;
...
...
@@ -87,10 +89,14 @@ typedef struct SWal {
// cfg
int32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
int32_t
fsyncSeq
;
int32_t
rollPeriod
;
// second
int64_t
segSize
;
int64_t
rtSize
;
EWalType
level
;
//total size
int64_t
totSize
;
//fsync seq
int32_t
fsyncSeq
;
//reference
int64_t
refId
;
//current tfd
...
...
@@ -98,25 +104,32 @@ typedef struct SWal {
int64_t
curIdxTfd
;
//current version
int64_t
curVersion
;
int64_t
curLogOffset
;
//current file version
int64_t
curFileFirstVersion
;
int64_t
curFileLastVersion
;
//wal fileset version
//int64_t curFileFirstVersion;
//int64_t curFileLastVersion;
//wal lifecycle
int64_t
firstVersion
;
int64_t
snapshotVersion
;
int64_t
commitVersion
;
int64_t
lastVersion
;
int64_t
lastFileName
;
//last file
//int64_t lastFileName;
//roll status
int64_t
lastRollSeq
;
int64_t
lastFileWriteSize
;
//int64_t lastFileWriteSize;
//file set
int32_t
fileCursor
;
SArray
*
fileInfoSet
;
//ctl
int32_t
curStatus
;
pthread_mutex_t
mutex
;
//path
char
path
[
WAL_PATH_LEN
];
//file set
SArray
*
fileSet
;
//reusable write head
SWalHead
head
;
}
SWal
;
// WAL HANDLE
...
...
@@ -133,7 +146,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void
walClose
(
SWal
*
);
// write
int64_t
walWrite
(
SWal
*
,
int64_t
index
,
uint8_t
msgType
,
void
*
body
,
int32_t
bodyLen
);
int64_t
walWrite
(
SWal
*
,
int64_t
index
,
uint8_t
msgType
,
const
void
*
body
,
int32_t
bodyLen
);
void
walFsync
(
SWal
*
,
bool
force
);
// apis for lifecycle management
...
...
include/util/tfile.h
浏览文件 @
cf93f4c2
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_FILE_H
#define _TD_UTIL_FILE_H
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/libs/wal/CMakeLists.txt
浏览文件 @
cf93f4c2
...
...
@@ -8,6 +8,11 @@ target_include_directories(
target_link_libraries
(
wal
PUBLIC cjson
PUBLIC os
PUBLIC util
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
source/libs/wal/inc/walInt.h
浏览文件 @
cf93f4c2
...
...
@@ -23,9 +23,73 @@
extern
"C"
{
#endif
int
walGetFile
(
SWal
*
pWal
,
int32_t
version
);
//meta section begin
typedef
struct
WalFileInfo
{
int64_t
firstVer
;
int64_t
lastVer
;
int64_t
createTs
;
int64_t
closeTs
;
int64_t
fileSize
;
}
WalFileInfo
;
static
inline
int32_t
compareWalFileInfo
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
WalFileInfo
*
pInfoLeft
=
(
WalFileInfo
*
)
pLeft
;
WalFileInfo
*
pInfoRight
=
(
WalFileInfo
*
)
pRight
;
return
compareInt64Val
(
&
pInfoLeft
->
firstVer
,
&
pInfoRight
->
firstVer
);
}
static
inline
int64_t
walGetLastFileSize
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
return
pInfo
->
fileSize
;
}
static
inline
int64_t
walGetLastFileFirstVer
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
return
pInfo
->
firstVer
;
}
static
inline
int64_t
walGetCurFileFirstVer
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
fileCursor
);
return
pInfo
->
firstVer
;
}
static
inline
int64_t
walGetCurFileLastVer
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
fileCursor
);
return
pInfo
->
firstVer
;
}
static
inline
int64_t
walGetCurFileOffset
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
fileCursor
);
return
pInfo
->
fileSize
;
}
static
inline
bool
walCurFileClosed
(
SWal
*
pWal
)
{
return
taosArrayGetSize
(
pWal
->
fileInfoSet
)
!=
pWal
->
fileCursor
;
}
static
inline
WalFileInfo
*
walGetCurFileInfo
(
SWal
*
pWal
)
{
return
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
fileCursor
);
}
static
inline
int
walBuildLogName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/%"
PRId64
"."
WAL_LOG_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
}
static
inline
int
walBuildIdxName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/%"
PRId64
"."
WAL_INDEX_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
}
int
walReadMeta
(
SWal
*
pWal
);
int
walWriteMeta
(
SWal
*
pWal
);
int
walRollFileInfo
(
SWal
*
pWal
);
char
*
walFileInfoSerialize
(
SWal
*
pWal
);
SArray
*
walFileInfoDeserialize
(
const
char
*
bytes
);
//meta section end
int64_t
walGetSeq
();
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
);
int
walRoll
(
SWal
*
pWal
);
#ifdef __cplusplus
}
...
...
source/libs/wal/src/walIndex.c
浏览文件 @
cf93f4c2
...
...
@@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t
logTfd
=
pWal
->
curLogTfd
;
//seek position
int64_t
offset
=
(
ver
-
pWal
->
curFileFirstVersion
)
*
WAL_IDX_ENTRY_SIZE
;
int64_t
offset
=
(
ver
-
walGetCurFileFirstVer
(
pWal
)
)
*
WAL_IDX_ENTRY_SIZE
;
code
=
tfLseek
(
idxTfd
,
offset
,
SEEK_SET
);
if
(
code
!=
0
)
{
...
...
@@ -43,7 +43,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
if
(
code
!=
0
)
{
}
pWal
->
curLogOffset
=
readBuf
[
1
];
/*pWal->curLogOffset = readBuf[1];*/
pWal
->
curVersion
=
ver
;
return
code
;
}
...
...
@@ -60,27 +60,27 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
if
(
code
!=
0
)
{
//TODO
}
WalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
//bsearch in fileSet
int64_t
*
pRet
=
taosArraySearch
(
pWal
->
fileSet
,
&
ver
,
compareInt64Val
,
TD_LE
);
WalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
int64_t
fname
=
*
pRet
;
if
(
fname
<
pWal
->
lastFileName
)
{
int64_t
fileFirstVer
=
pRet
->
firstVer
;
//closed
if
(
taosArrayGetLast
(
pWal
->
fileInfoSet
)
!=
pRet
)
{
pWal
->
curStatus
&=
~
WAL_CUR_FILE_WRITABLE
;
pWal
->
curFileLastVersion
=
pRet
[
1
]
-
1
;
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
fname
);
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenRead
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
fname
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenRead
(
fnameStr
);
}
else
{
pWal
->
curStatus
|=
WAL_CUR_FILE_WRITABLE
;
pWal
->
curFileLastVersion
=
-
1
;
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
fname
);
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
fname
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
}
pWal
->
curFileFirstVersion
=
fname
;
pWal
->
curLogTfd
=
logTfd
;
pWal
->
curIdxTfd
=
idxTfd
;
return
code
;
...
...
@@ -102,8 +102,7 @@ int walSeekVer(SWal *pWal, int64_t ver) {
if
(
ver
<
pWal
->
snapshotVersion
)
{
//TODO: seek snapshotted log, invalid in some cases
}
if
(
ver
<
pWal
->
curFileFirstVersion
||
(
pWal
->
curFileLastVersion
!=
-
1
&&
ver
>
pWal
->
curFileLastVersion
))
{
if
(
ver
<
walGetCurFileFirstVer
(
pWal
)
||
(
ver
>
walGetCurFileLastVer
(
pWal
)))
{
walChangeFile
(
pWal
,
ver
);
}
walSeekFilePos
(
pWal
,
ver
);
...
...
source/libs/wal/src/walMeta.c
0 → 100644
浏览文件 @
cf93f4c2
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "cJSON.h"
#include "walInt.h"
#include <libgen.h>
#include <regex.h>
int
walRollFileInfo
(
SWal
*
pWal
)
{
int64_t
ts
=
taosGetTimestampSec
();
SArray
*
pArray
=
pWal
->
fileInfoSet
;
if
(
taosArrayGetSize
(
pArray
)
!=
0
)
{
WalFileInfo
*
pInfo
=
taosArrayGetLast
(
pArray
);
pInfo
->
lastVer
=
pWal
->
lastVersion
;
pInfo
->
closeTs
=
ts
;
}
WalFileInfo
*
pNewInfo
=
malloc
(
sizeof
(
WalFileInfo
));
if
(
pNewInfo
==
NULL
)
{
return
-
1
;
}
pNewInfo
->
firstVer
=
pWal
->
lastVersion
+
1
;
pNewInfo
->
lastVer
=
-
1
;
pNewInfo
->
createTs
=
ts
;
pNewInfo
->
closeTs
=
-
1
;
pNewInfo
->
fileSize
=
0
;
taosArrayPush
(
pWal
->
fileInfoSet
,
pNewInfo
);
return
0
;
}
char
*
walFileInfoSerialize
(
SWal
*
pWal
)
{
char
buf
[
30
];
if
(
pWal
==
NULL
||
pWal
->
fileInfoSet
==
NULL
)
return
0
;
int
sz
=
pWal
->
fileInfoSet
->
size
;
cJSON
*
root
=
cJSON_CreateArray
();
cJSON
*
field
;
if
(
root
==
NULL
)
{
//TODO
return
NULL
;
}
WalFileInfo
*
pData
=
pWal
->
fileInfoSet
->
pData
;
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
WalFileInfo
*
pInfo
=
&
pData
[
i
];
cJSON_AddItemToArray
(
root
,
field
=
cJSON_CreateObject
());
if
(
field
==
NULL
)
{
cJSON_Delete
(
root
);
return
NULL
;
}
//cjson only support int32_t or double
//string are used to prohibit the loss of precision
sprintf
(
buf
,
"%ld"
,
pInfo
->
firstVer
);
cJSON_AddStringToObject
(
field
,
"firstVer"
,
buf
);
sprintf
(
buf
,
"%ld"
,
pInfo
->
lastVer
);
cJSON_AddStringToObject
(
field
,
"lastVer"
,
buf
);
sprintf
(
buf
,
"%ld"
,
pInfo
->
createTs
);
cJSON_AddStringToObject
(
field
,
"createTs"
,
buf
);
sprintf
(
buf
,
"%ld"
,
pInfo
->
closeTs
);
cJSON_AddStringToObject
(
field
,
"closeTs"
,
buf
);
sprintf
(
buf
,
"%ld"
,
pInfo
->
fileSize
);
cJSON_AddStringToObject
(
field
,
"fileSize"
,
buf
);
}
return
cJSON_Print
(
root
);
}
SArray
*
walFileInfoDeserialize
(
const
char
*
bytes
)
{
cJSON
*
root
,
*
pInfoJson
,
*
pField
;
root
=
cJSON_Parse
(
bytes
);
int
sz
=
cJSON_GetArraySize
(
root
);
//deserialize
SArray
*
pArray
=
taosArrayInit
(
sz
,
sizeof
(
WalFileInfo
));
WalFileInfo
*
pData
=
pArray
->
pData
;
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
cJSON
*
pInfoJson
=
cJSON_GetArrayItem
(
root
,
i
);
WalFileInfo
*
pInfo
=
&
pData
[
i
];
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"firstVer"
);
pInfo
->
firstVer
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"lastVer"
);
pInfo
->
lastVer
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"createTs"
);
pInfo
->
createTs
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"closeTs"
);
pInfo
->
closeTs
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"fileSize"
);
pInfo
->
fileSize
=
atoll
(
cJSON_GetStringValue
(
pField
));
}
taosArraySetSize
(
pArray
,
sz
);
return
pArray
;
}
static
inline
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
static
int
walFindCurMetaVer
(
SWal
*
pWal
)
{
const
char
*
pattern
=
"^meta-ver[0-9]+$"
;
regex_t
walMetaRegexPattern
;
regcomp
(
&
walMetaRegexPattern
,
pattern
,
REG_EXTENDED
);
DIR
*
dir
=
opendir
(
pWal
->
path
);
if
(
dir
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
struct
dirent
*
ent
;
//find existing meta-ver[x].json
int
metaVer
=
-
1
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
char
*
name
=
basename
(
ent
->
d_name
);
int
code
=
regexec
(
&
walMetaRegexPattern
,
name
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
sscanf
(
name
,
"meta-ver%d"
,
&
metaVer
);
break
;
}
}
return
metaVer
;
}
int
walWriteMeta
(
SWal
*
pWal
)
{
int
metaVer
=
walFindCurMetaVer
(
pWal
);
char
fnameStr
[
WAL_FILE_LEN
];
walBuildMetaName
(
pWal
,
metaVer
+
1
,
fnameStr
);
int
metaTfd
=
tfOpenCreateWrite
(
fnameStr
);
if
(
metaTfd
<
0
)
{
return
-
1
;
}
char
*
serialized
=
walFileInfoSerialize
(
pWal
);
int
len
=
strlen
(
serialized
);
if
(
len
!=
tfWrite
(
metaTfd
,
serialized
,
len
))
{
//TODO:clean file
return
-
1
;
}
tfClose
(
metaTfd
);
//delete old file
if
(
metaVer
>
-
1
)
{
walBuildMetaName
(
pWal
,
metaVer
,
fnameStr
);
remove
(
fnameStr
);
}
return
0
;
}
int
walReadMeta
(
SWal
*
pWal
)
{
ASSERT
(
pWal
->
fileInfoSet
->
size
==
0
);
//find existing meta file
int
metaVer
=
walFindCurMetaVer
(
pWal
);
if
(
metaVer
==
-
1
)
{
return
0
;
}
char
fnameStr
[
WAL_FILE_LEN
];
walBuildMetaName
(
pWal
,
metaVer
,
fnameStr
);
//read metafile
struct
stat
statbuf
;
stat
(
fnameStr
,
&
statbuf
);
int
size
=
statbuf
.
st_size
;
char
*
buf
=
malloc
(
size
+
5
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
int
tfd
=
tfOpenRead
(
fnameStr
);
if
(
tfRead
(
tfd
,
buf
,
size
)
!=
size
)
{
free
(
buf
);
return
-
1
;
}
//load into fileInfoSet
pWal
->
fileInfoSet
=
walFileInfoDeserialize
(
buf
);
if
(
pWal
->
fileInfoSet
==
NULL
)
{
free
(
buf
);
return
-
1
;
}
free
(
buf
);
return
0
;
}
source/libs/wal/src/walMgmt.c
浏览文件 @
cf93f4c2
...
...
@@ -64,43 +64,31 @@ int32_t walInit() {
void
walCleanUp
()
{
walStopThread
();
taosCloseRef
(
tsWal
.
refSetId
);
atomic_store_8
(
&
tsWal
.
inited
,
0
);
wInfo
(
"wal module is cleaned up"
);
}
static
int
walLoadFileset
(
SWal
*
pWal
)
{
DIR
*
dir
=
opendir
(
pWal
->
path
);
if
(
dir
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
struct
dirent
*
ent
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
char
*
name
=
ent
->
d_name
;
name
[
WAL_NOSUFFIX_LEN
]
=
0
;
//validate file name by regex matching
if
(
1
/* TODO:regex match */
)
{
int64_t
fnameInt64
=
atoll
(
name
);
taosArrayPush
(
pWal
->
fileSet
,
&
fnameInt64
);
}
}
taosArraySort
(
pWal
->
fileSet
,
compareInt64Val
);
return
0
;
}
SWal
*
walOpen
(
const
char
*
path
,
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
malloc
(
sizeof
(
SWal
));
if
(
pWal
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
pWal
->
vgId
=
pCfg
->
vgId
;
pWal
->
curLogTfd
=
-
1
;
pWal
->
curIdxTfd
=
-
1
;
pWal
->
level
=
pCfg
->
walLevel
;
//set config
pWal
->
vgId
=
pCfg
->
vgId
;
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
pWal
->
rollPeriod
=
pCfg
->
rollPeriod
;
pWal
->
segSize
=
pCfg
->
segSize
;
pWal
->
level
=
pCfg
->
walLevel
;
//init status
pWal
->
lastVersion
=
-
1
;
pWal
->
lastRollSeq
=
-
1
;
//init write buffer
memset
(
&
pWal
->
head
,
0
,
sizeof
(
SWalHead
));
pWal
->
head
.
sver
=
0
;
...
...
@@ -120,7 +108,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walFreeObj
(
pWal
);
return
NULL
;
}
walLoadFileset
(
pWal
);
wDebug
(
"vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d"
,
pWal
->
vgId
,
pWal
,
pWal
->
level
,
pWal
->
fsyncPeriod
);
...
...
@@ -153,8 +140,8 @@ void walClose(SWal *pWal) {
pthread_mutex_lock
(
&
pWal
->
mutex
);
tfClose
(
pWal
->
curLogTfd
);
tfClose
(
pWal
->
curIdxTfd
);
taosArrayDestroy
(
pWal
->
fileSet
);
pWal
->
fileSet
=
NULL
;
/*taosArrayDestroy(pWal->fileInfoSet);*/
/*pWal->fileInfoSet = NULL;*/
pthread_mutex_unlock
(
&
pWal
->
mutex
);
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
}
...
...
@@ -164,8 +151,8 @@ static int32_t walInitObj(SWal *pWal) {
wError
(
"vgId:%d, path:%s, failed to create directory since %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
TAOS_SYSTEM_ERROR
(
errno
);
}
pWal
->
file
Set
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pWal
->
fileSet
==
NULL
)
{
pWal
->
file
InfoSet
=
taosArrayInit
(
0
,
sizeof
(
WalFileInfo
));
if
(
pWal
->
file
Info
Set
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to init taosArray %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
TAOS_SYSTEM_ERROR
(
errno
);
}
...
...
@@ -180,8 +167,10 @@ static void walFreeObj(void *wal) {
tfClose
(
pWal
->
curLogTfd
);
tfClose
(
pWal
->
curIdxTfd
);
taosArrayDestroy
(
pWal
->
fileSet
);
pWal
->
fileSet
=
NULL
;
taosArrayDestroy
(
pWal
->
fileInfoSet
);
pWal
->
fileInfoSet
=
NULL
;
taosArrayDestroy
(
pWal
->
fileInfoSet
);
pWal
->
fileInfoSet
=
NULL
;
pthread_mutex_destroy
(
&
pWal
->
mutex
);
tfree
(
pWal
);
}
...
...
@@ -210,7 +199,7 @@ static void walFsyncAll() {
wTrace
(
"vgId:%d, do fsync, level:%d seq:%d rseq:%d"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncSeq
,
atomic_load_32
(
&
tsWal
.
seq
));
int32_t
code
=
tfFsync
(
pWal
->
curLogTfd
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to fsync since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
code
));
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to fsync since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
)
,
strerror
(
code
));
}
}
pWal
=
taosIterateRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
...
...
source/libs/wal/src/walRead.c
浏览文件 @
cf93f4c2
...
...
@@ -13,16 +13,56 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "wal.h"
#include "walInt.h"
#include "tfile.h"
#include "tchecksum.h"
static
int
walValidateChecksum
(
SWalHead
*
pHead
,
void
*
body
,
int64_t
bodyLen
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)
-
sizeof
(
uint32_t
)
*
2
,
pHead
->
cksumHead
)
&&
taosCheckChecksum
(
body
,
bodyLen
,
pHead
->
cksumBody
);
static
inline
int
walValidHeadCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)
-
sizeof
(
uint32_t
)
*
2
,
pHead
->
cksumHead
);
}
static
inline
int
walValidBodyCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
->
cont
,
pHead
->
len
,
pHead
->
cksumBody
);
}
static
int
walValidCksum
(
SWalHead
*
pHead
,
void
*
body
,
int64_t
bodyLen
)
{
return
walValidHeadCksum
(
pHead
)
&&
walValidBodyCksum
(
pHead
);
}
int32_t
walRead
(
SWal
*
pWal
,
SWalHead
**
ppHead
,
int64_t
ver
)
{
int
code
;
code
=
walSeekVer
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
code
;
}
if
(
*
ppHead
==
NULL
)
{
void
*
ptr
=
realloc
(
*
ppHead
,
sizeof
(
SWalHead
));
if
(
ptr
==
NULL
)
{
return
-
1
;
}
*
ppHead
=
ptr
;
}
if
(
tfRead
(
pWal
->
curLogTfd
,
*
ppHead
,
sizeof
(
SWalHead
))
!=
sizeof
(
SWalHead
))
{
return
-
1
;
}
//TODO: endian compatibility processing after read
if
(
walValidHeadCksum
(
*
ppHead
)
!=
0
)
{
return
-
1
;
}
void
*
ptr
=
realloc
(
*
ppHead
,
sizeof
(
SWalHead
)
+
(
*
ppHead
)
->
len
);
if
(
ptr
==
NULL
)
{
free
(
*
ppHead
);
*
ppHead
=
NULL
;
return
-
1
;
}
if
(
tfRead
(
pWal
->
curLogTfd
,
(
*
ppHead
)
->
cont
,
(
*
ppHead
)
->
len
)
!=
(
*
ppHead
)
->
len
)
{
return
-
1
;
}
//TODO: endian compatibility processing after read
if
(
walValidBodyCksum
(
*
ppHead
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
cf93f4c2
...
...
@@ -21,29 +21,42 @@
#include "tfile.h"
#include "walInt.h"
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
ver
);
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
ASSERT
(
pWal
->
snapshotVersion
<=
pWal
->
commitVersion
);
ASSERT
(
pWal
->
commitVersion
<=
pWal
->
lastVersion
);
ASSERT
(
ver
>=
pWal
->
commitVersion
);
ASSERT
(
ver
<=
pWal
->
lastVersion
);
pWal
->
commitVersion
=
ver
;
return
0
;
}
int32_t
walRollback
(
SWal
*
pWal
,
int64_t
ver
)
{
//TODO: ftruncate
ASSERT
(
ver
>
pWal
->
commitVersion
);
ASSERT
(
ver
<=
pWal
->
lastVersion
);
//seek position
walSeekVer
(
pWal
,
ver
);
walFtruncate
(
pWal
,
ver
);
return
0
;
}
int32_t
walTakeSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
pWal
->
snapshotVersion
=
ver
;
WalFileInfo
tmp
;
tmp
.
firstVer
=
ver
;
//mark files safe to delete
int64_t
*
pRet
=
taosArraySearch
(
pWal
->
fileSet
,
&
ver
,
compareInt64Val
,
TD_LE
);
if
(
pRet
!=
pWal
->
fileSet
->
pData
)
{
//delete files until less than retention size
//find first file that exceeds retention time
}
WalFileInfo
*
pInfo
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmp
,
compareWalFileInfo
,
TD_LE
);
//iterate files, until the searched result
//if totSize > rtSize, delete
//if createTs > retentionTs, delete
//save snapshot ver, commit ver
//make new array, remove files
//delete files living longer than retention limit
//remove file from fileset
return
0
;
}
...
...
@@ -138,105 +151,123 @@ void walRemoveAllOldFiles(void *handle) {
}
#endif
static
int
walRoll
(
SWal
*
pWal
)
{
int
walRoll
(
SWal
*
pWal
)
{
int
code
=
0
;
code
=
tfClose
(
pWal
->
curIdxTfd
);
if
(
code
!=
0
)
{
return
code
;
if
(
pWal
->
curIdxTfd
!=
-
1
)
{
code
=
tfClose
(
pWal
->
curIdxTfd
);
if
(
code
!=
0
)
{
return
-
1
;
}
}
code
=
tfClose
(
pWal
->
curLogTfd
);
if
(
code
!=
0
)
{
return
code
;
if
(
pWal
->
curLogTfd
!=
-
1
)
{
code
=
tfClose
(
pWal
->
curLogTfd
);
if
(
code
!=
0
)
{
return
-
1
;
}
}
int64_t
idxTfd
,
logTfd
;
//create new file
int64_t
newFileFirstVersion
=
pWal
->
lastVersion
+
1
;
char
fnameStr
[
WAL_FILE_LEN
];
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
newFileFirstVersion
);
walBuildIdxName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
idxTfd
=
tfOpenCreateWrite
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
newFileFirstVersion
);
if
(
idxTfd
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
walBuildLogName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
logTfd
=
tfOpenCreateWrite
(
fnameStr
);
if
(
logTfd
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
code
=
walRollFileInfo
(
pWal
);
if
(
code
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
taosArrayPush
(
pWal
->
fileSet
,
&
newFileFirstVersion
);
//switch file
pWal
->
curIdxTfd
=
idxTfd
;
pWal
->
curLogTfd
=
logTfd
;
//change status
pWal
->
curFileLastVersion
=
-
1
;
pWal
->
curFileFirstVersion
=
newFileFirstVersion
;
pWal
->
curVersion
=
newFileFirstVersion
;
pWal
->
curLogOffset
=
0
;
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
&
WAL_CUR_POS_WRITABLE
;
pWal
->
lastFileName
=
newFileFirstVersion
;
pWal
->
lastFileWriteSize
=
0
;
pWal
->
lastRollSeq
=
walGetSeq
();
return
0
;
}
int
walChangeFileToLast
(
SWal
*
pWal
)
{
int64_t
idxTfd
,
logTfd
;
int64_t
*
pRet
=
taosArrayGetLast
(
pWal
->
file
Set
);
WalFileInfo
*
pRet
=
taosArrayGetLast
(
pWal
->
fileInfo
Set
);
ASSERT
(
pRet
!=
NULL
);
int64_t
f
name
=
*
pRet
;
int64_t
f
ileFirstVer
=
pRet
->
firstVer
;
char
fnameStr
[
WAL_FILE_LEN
];
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
fname
);
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
fname
);
if
(
idxTfd
<
0
)
{
return
-
1
;
}
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
if
(
logTfd
<
0
)
{
return
-
1
;
}
//switch file
pWal
->
curIdxTfd
=
idxTfd
;
pWal
->
curLogTfd
=
logTfd
;
//change status
pWal
->
curFileLastVersion
=
-
1
;
pWal
->
curFileFirstVersion
=
fname
;
pWal
->
curVersion
=
fname
;
pWal
->
curLogOffset
=
0
;
pWal
->
curVersion
=
fileFirstVer
;
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
;
return
0
;
}
int
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
static
int
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
int
code
=
0
;
//get index file
if
(
!
tfValid
(
pWal
->
curIdxTfd
))
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".idx, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
WalFileInfo
*
pInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
fileCursor
);
wError
(
"vgId:%d, file:%"
PRId64
".idx, failed to open since %s"
,
pWal
->
vgId
,
pInfo
->
firstVer
,
strerror
(
errno
));
return
code
;
}
int64_t
writeBuf
[
2
]
=
{
ver
,
offset
};
int
size
=
tfWrite
(
pWal
->
curIdxTfd
,
writeBuf
,
sizeof
(
writeBuf
));
if
(
size
!=
sizeof
(
writeBuf
))
{
//TODO:
return
-
1
;
}
return
0
;
}
int64_t
walWrite
(
SWal
*
pWal
,
int64_t
index
,
uint8_t
msgType
,
void
*
body
,
int32_t
bodyLen
)
{
int64_t
walWrite
(
SWal
*
pWal
,
int64_t
index
,
uint8_t
msgType
,
const
void
*
body
,
int32_t
bodyLen
)
{
if
(
pWal
==
NULL
)
return
-
1
;
int
code
=
0
;
// no wal
if
(
pWal
->
level
==
TAOS_WAL_NOLOG
)
return
0
;
if
(
index
==
pWal
->
lastVersion
+
1
)
{
int64_t
passed
=
walGetSeq
()
-
pWal
->
lastRollSeq
;
if
(
passed
>
pWal
->
rollPeriod
)
{
walRoll
(
pWal
);
}
else
if
(
pWal
->
lastFileWriteSize
>
pWal
->
segSize
)
{
walRoll
(
pWal
);
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
code
=
walRoll
(
pWal
);
ASSERT
(
code
==
0
);
}
else
{
walChangeFileToLast
(
pWal
);
int64_t
passed
=
walGetSeq
()
-
pWal
->
lastRollSeq
;
if
(
pWal
->
rollPeriod
!=
-
1
&&
passed
>
pWal
->
rollPeriod
)
{
walRoll
(
pWal
);
}
else
if
(
pWal
->
segSize
!=
-
1
&&
walGetLastFileSize
(
pWal
)
>
pWal
->
segSize
)
{
walRoll
(
pWal
);
}
}
}
else
{
//reject skip log or rewrite log
//must truncate explicitly first
return
-
1
;
}
if
(
!
tfValid
(
pWal
->
curLogTfd
))
return
0
;
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
pWal
->
head
.
version
=
index
;
int32_t
code
=
0
;
pWal
->
head
.
signature
=
WAL_SIGNATURE
;
pWal
->
head
.
len
=
bodyLen
;
...
...
@@ -250,19 +281,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
if
(
tfWrite
(
pWal
->
curLogTfd
,
&
pWal
->
head
,
sizeof
(
SWalHead
))
!=
sizeof
(
SWalHead
))
{
//ftruncate
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
)
,
strerror
(
errno
));
}
if
(
tfWrite
(
pWal
->
curLogTfd
,
&
body
,
bodyLen
)
!=
bodyLen
)
{
//ftruncate
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
),
strerror
(
errno
));
}
code
=
walWriteIndex
(
pWal
,
index
,
walGetCurFileOffset
(
pWal
));
if
(
code
!=
0
)
{
//TODO
}
walWriteIndex
(
pWal
,
index
,
pWal
->
curLogOffset
);
pWal
->
curLogOffset
+=
sizeof
(
SWalHead
)
+
bodyLen
;
//set status
pWal
->
lastVersion
=
index
;
walGetCurFileInfo
(
pWal
)
->
lastVer
=
index
;
walGetCurFileInfo
(
pWal
)
->
fileSize
+=
sizeof
(
SWalHead
)
+
bodyLen
;
pthread_mutex_unlock
(
&
pWal
->
mutex
);
...
...
@@ -273,9 +308,9 @@ void walFsync(SWal *pWal, bool forceFsync) {
if
(
pWal
==
NULL
||
!
tfValid
(
pWal
->
curLogTfd
))
return
;
if
(
forceFsync
||
(
pWal
->
level
==
TAOS_WAL_FSYNC
&&
pWal
->
fsyncPeriod
==
0
))
{
wTrace
(
"vgId:%d, fileId:%"
PRId64
".log, do fsync"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
);
wTrace
(
"vgId:%d, fileId:%"
PRId64
".log, do fsync"
,
pWal
->
vgId
,
walGetCurFileFirstVer
(
pWal
)
);
if
(
tfFsync
(
pWal
->
curLogTfd
)
<
0
)
{
wError
(
"vgId:%d, file:%"
PRId64
".log, fsync failed since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%"
PRId64
".log, fsync failed since %s"
,
pWal
->
vgId
,
walGetCurFileFirstVer
(
pWal
)
,
strerror
(
errno
));
}
}
}
...
...
@@ -348,8 +383,36 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
}
#endif
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
tfd
,
int64_t
offset
)
{
tfFtruncate
(
tfd
,
offset
);
static
int
walValidateOffset
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
0
;
SWalHead
*
pHead
=
NULL
;
code
=
(
int
)
walRead
(
pWal
,
&
pHead
,
ver
);
if
(
pHead
->
version
!=
ver
)
{
return
-
1
;
}
return
0
;
}
static
int64_t
walGetOffset
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
walSeekVer
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
code
=
walValidateOffset
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
return
0
;
}
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
ver
)
{
int64_t
tfd
=
pWal
->
curLogTfd
;
tfFtruncate
(
tfd
,
ver
);
tfFsync
(
tfd
);
tfd
=
pWal
->
curIdxTfd
;
tfFtruncate
(
tfd
,
ver
*
WAL_IDX_ENTRY_SIZE
);
tfFsync
(
tfd
);
}
...
...
source/libs/wal/test/CMakeLists.txt
0 → 100644
浏览文件 @
cf93f4c2
add_executable
(
walTest
""
)
target_sources
(
walTest
PRIVATE
"walMetaTest.cpp"
)
target_include_directories
(
walTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/wal"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
walTest
wal
gtest_main
)
enable_testing
()
add_test
(
NAME wal_test
COMMAND walTest
)
source/libs/wal/test/walMetaTest.cpp
0 → 100644
浏览文件 @
cf93f4c2
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "tfile.h"
#include "walInt.h"
class
WalCleanEnv
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestCase
()
{
int
code
=
walInit
();
ASSERT
(
code
==
0
);
code
=
tfInit
();
ASSERT
(
code
==
0
);
}
static
void
TearDownTestCase
()
{
walCleanUp
();
tfCleanup
();
}
void
SetUp
()
override
{
taosRemoveDir
(
pathName
);
SWalCfg
*
pCfg
=
(
SWalCfg
*
)
malloc
(
sizeof
(
SWal
));
memset
(
pCfg
,
0
,
sizeof
(
SWalCfg
));
pCfg
->
rollPeriod
=
-
1
;
pCfg
->
segSize
=
-
1
;
pCfg
->
walLevel
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
pathName
,
pCfg
);
ASSERT
(
pWal
!=
NULL
);
}
void
TearDown
()
override
{
walClose
(
pWal
);
pWal
=
NULL
;
}
SWal
*
pWal
=
NULL
;
const
char
*
pathName
=
"/tmp/wal_test"
;
};
class
WalKeepEnv
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestCase
()
{
int
code
=
walInit
();
ASSERT
(
code
==
0
);
code
=
tfInit
();
ASSERT
(
code
==
0
);
}
static
void
TearDownTestCase
()
{
walCleanUp
();
tfCleanup
();
}
void
SetUp
()
override
{
SWalCfg
*
pCfg
=
(
SWalCfg
*
)
malloc
(
sizeof
(
SWal
));
memset
(
pCfg
,
0
,
sizeof
(
SWalCfg
));
pCfg
->
rollPeriod
=
-
1
;
pCfg
->
segSize
=
-
1
;
pCfg
->
walLevel
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
pathName
,
pCfg
);
ASSERT
(
pWal
!=
NULL
);
}
void
TearDown
()
override
{
walClose
(
pWal
);
pWal
=
NULL
;
}
SWal
*
pWal
=
NULL
;
const
char
*
pathName
=
"/tmp/wal_test"
;
};
TEST_F
(
WalCleanEnv
,
createNew
)
{
walRollFileInfo
(
pWal
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
ASSERT_EQ
(
pWal
->
fileInfoSet
->
size
,
1
);
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
ASSERT_EQ
(
pInfo
->
firstVer
,
0
);
ASSERT_EQ
(
pInfo
->
lastVer
,
-
1
);
ASSERT_EQ
(
pInfo
->
closeTs
,
-
1
);
ASSERT_EQ
(
pInfo
->
fileSize
,
0
);
}
TEST_F
(
WalCleanEnv
,
serialize
)
{
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
char
*
ss
=
walFileInfoSerialize
(
pWal
);
printf
(
"%s
\n
"
,
ss
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
}
TEST_F
(
WalCleanEnv
,
removeOldMeta
)
{
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
}
TEST_F
(
WalKeepEnv
,
readOldMeta
)
{
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
char
*
oldss
=
walFileInfoSerialize
(
pWal
);
TearDown
();
SetUp
();
code
=
walReadMeta
(
pWal
);
ASSERT
(
code
==
0
);
char
*
newss
=
walFileInfoSerialize
(
pWal
);
int
len
=
strlen
(
oldss
);
ASSERT_EQ
(
len
,
strlen
(
newss
));
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
EXPECT_EQ
(
oldss
[
i
],
newss
[
i
]);
}
}
TEST_F
(
WalKeepEnv
,
write
)
{
const
char
*
ranStr
=
"tvapq02tcp"
;
const
int
len
=
strlen
(
ranStr
);
int
code
;
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
code
=
walWrite
(
pWal
,
i
,
i
+
1
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
0
);
code
=
walWrite
(
pWal
,
i
+
2
,
i
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
-
1
);
}
code
=
walWriteMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
source/libs/wal/test/walTests.cpp
已删除
100644 → 0
浏览文件 @
0237a2ba
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "tlog.h"
#include "twal.h"
#include "tfile.h"
int64_t
ver
=
0
;
void
*
pWal
=
NULL
;
int
writeToQueue
(
void
*
pVnode
,
void
*
data
,
int
type
,
void
*
pMsg
)
{
// do nothing
SWalHead
*
pHead
=
data
;
if
(
pHead
->
version
>
ver
)
ver
=
pHead
->
version
;
walWrite
(
pWal
,
pHead
);
return
0
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
char
path
[
128
]
=
"/tmp/wal"
;
int
level
=
2
;
int
total
=
5
;
int
rows
=
10000
;
int
size
=
128
;
int
keep
=
0
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
tstrncpy
(
path
,
argv
[
++
i
],
sizeof
(
path
));
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
&&
i
<
argc
-
1
)
{
level
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
&&
i
<
argc
-
1
)
{
rows
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
&&
i
<
argc
-
1
)
{
keep
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
&&
i
<
argc
-
1
)
{
total
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
&&
i
<
argc
-
1
)
{
size
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-v"
)
==
0
&&
i
<
argc
-
1
)
{
ver
=
atoll
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
dDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-p path]: wal file path default is:%s
\n
"
,
path
);
printf
(
" [-l level]: log level, default is:%d
\n
"
,
level
);
printf
(
" [-t total]: total wal files, default is:%d
\n
"
,
total
);
printf
(
" [-r rows]: rows of records per wal file, default is:%d
\n
"
,
rows
);
printf
(
" [-k keep]: keep the wal after closing, default is:%d
\n
"
,
keep
);
printf
(
" [-v version]: initial version, default is:%"
PRId64
"
\n
"
,
ver
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
dDebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
}
}
taosInitLog
(
"wal.log"
,
100000
,
10
);
tfInit
();
walInit
();
SWalCfg
walCfg
=
{
0
};
walCfg
.
walLevel
=
level
;
walCfg
.
keep
=
keep
;
pWal
=
walOpen
(
path
,
&
walCfg
);
if
(
pWal
==
NULL
)
{
printf
(
"failed to open wal
\n
"
);
exit
(
-
1
);
}
int
ret
=
walRestore
(
pWal
,
NULL
,
writeToQueue
);
if
(
ret
<
0
)
{
printf
(
"failed to restore wal
\n
"
);
exit
(
-
1
);
}
printf
(
"version starts from:%"
PRId64
"
\n
"
,
ver
);
int
contLen
=
sizeof
(
SWalHead
)
+
size
;
SWalHead
*
pHead
=
(
SWalHead
*
)
malloc
(
contLen
);
for
(
int
i
=
0
;
i
<
total
;
++
i
)
{
for
(
int
k
=
0
;
k
<
rows
;
++
k
)
{
pHead
->
version
=
++
ver
;
pHead
->
len
=
size
;
walWrite
(
pWal
,
pHead
);
}
printf
(
"renew a wal, i:%d
\n
"
,
i
);
walRenew
(
pWal
);
}
printf
(
"%d wal files are written
\n
"
,
total
);
int64_t
index
=
0
;
char
name
[
256
];
while
(
1
)
{
int
code
=
walGetWalFile
(
pWal
,
name
,
&
index
);
if
(
code
==
-
1
)
{
printf
(
"failed to get wal file, index:%"
PRId64
"
\n
"
,
index
);
break
;
}
printf
(
"index:%"
PRId64
" wal:%s
\n
"
,
index
,
name
);
if
(
code
==
0
)
break
;
}
getchar
();
walClose
(
pWal
);
walCleanUp
();
tfCleanup
();
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录