Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
77657f5c
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
77657f5c
编写于
11月 26, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/tq' into 3.0
上级
437985c4
9f75ad06
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
206 addition
and
170 deletion
+206
-170
include/libs/wal/wal.h
include/libs/wal/wal.h
+43
-25
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+3
-0
source/libs/wal/src/wal.c
source/libs/wal/src/wal.c
+1
-2
source/libs/wal/src/walIndex.c
source/libs/wal/src/walIndex.c
+85
-0
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+22
-33
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+52
-110
未找到文件。
include/libs/wal/wal.h
浏览文件 @
77657f5c
...
...
@@ -53,45 +53,63 @@ typedef struct {
EWalType
walLevel
;
// wal level
}
SWalCfg
;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_FILE_NUM 1 // 3
#define WAL_PREFIX "wal"
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_CUR_POS_READ_ONLY 1
#define WAL_CUR_FILE_READ_ONLY 2
typedef
struct
SWal
{
int64_t
version
;
int64_t
fileId
;
int64_t
rId
;
int64_t
tfd
;
int32_t
vgId
;
int32_t
keep
;
int32_t
level
;
int32_t
fsyncPeriod
;
// cfg
int32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
EWalType
level
;
//reference
int64_t
refId
;
//current tfd
int64_t
curLogTfd
;
int64_t
curIdxTfd
;
//current version
int64_t
curVersion
;
int64_t
curOffset
;
//current file version
int64_t
curFileFirstVersion
;
int64_t
curFileLastVersion
;
//wal fileset version
int64_t
firstVersion
;
int64_t
snapshotVersion
;
int64_t
lastVersion
;
//fsync status
int32_t
fsyncSeq
;
int8_t
stop
;
int8_t
reseved
[
3
];
char
path
[
WAL_PATH_LEN
];
char
name
[
WAL_FILE_LEN
];
//ctl
int32_t
curStatus
;
pthread_mutex_t
mutex
;
//path
char
path
[
WAL_PATH_LEN
];
}
SWal
;
// WAL HANDLE
typedef
int32_t
(
*
FWalWrite
)(
void
*
ahandle
,
void
*
pHead
,
void
*
pMsg
);
typedef
int32_t
(
*
FWalWrite
)(
void
*
ahandle
,
void
*
pHead
);
// module initialization
int32_t
walInit
();
void
walCleanUp
();
// handle open and ctl
SWal
*
walOpen
(
char
*
path
,
SWalCfg
*
pCfg
);
SWal
*
walOpen
(
const
char
*
path
,
SWalCfg
*
pCfg
);
void
walStop
(
SWal
*
pWal
);
int32_t
walAlter
(
SWal
*
,
SWalCfg
*
pCfg
);
void
walClose
(
SWal
*
);
// write
//
int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t
walWrite
(
SWal
*
,
int64_t
index
,
void
*
body
,
int32_t
bodyLen
);
int64_t
walWriteBatch
(
SWal
*
,
void
**
bodies
,
int32_t
*
bodyLen
,
int32_t
batchSize
);
...
...
@@ -101,7 +119,8 @@ int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t
walRollback
(
SWal
*
,
int64_t
ver
);
// notify that previous log can be pruned safely
int32_t
walPrune
(
SWal
*
,
int64_t
ver
);
int32_t
walTakeSnapshot
(
SWal
*
,
int64_t
ver
);
//int32_t walDataCorrupted(SWal*);
// read
int32_t
walRead
(
SWal
*
,
SWalHead
**
,
int64_t
ver
);
...
...
@@ -111,7 +130,6 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN
int64_t
walGetFirstVer
(
SWal
*
);
int64_t
walGetSnapshotVer
(
SWal
*
);
int64_t
walGetLastVer
(
SWal
*
);
// int32_t walDataCorrupted(SWal*);
//internal
int32_t
walGetNextFile
(
SWal
*
pWal
,
int64_t
*
nextFileId
);
...
...
source/libs/wal/inc/walInt.h
浏览文件 @
77657f5c
...
...
@@ -22,6 +22,9 @@
extern
"C"
{
#endif
int
walRotate
(
SWal
*
pWal
);
int
walGetFile
(
SWal
*
pWal
,
int32_t
version
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/wal/src/wal.c
浏览文件 @
77657f5c
...
...
@@ -23,11 +23,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return
0
;
}
int32_t
wal
Prune
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
wal
TakeSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
return
0
;
}
int32_t
walRead
(
SWal
*
pWal
,
SWalHead
**
ppHead
,
int64_t
ver
)
{
return
0
;
}
...
...
source/libs/wal/src/walIndex.c
0 → 100644
浏览文件 @
77657f5c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "walInt.h"
int
walSetCurVerImpl
(
SWal
*
pWal
,
int64_t
ver
)
{
//close old file
//iterate all files
//open right file
//set cur version, cur file version and cur status
return
0
;
}
int
walSetCurVer
(
SWal
*
pWal
,
int64_t
ver
)
{
if
(
ver
>
pWal
->
lastVersion
+
1
)
{
//TODO: some records are skipped
return
-
1
;
}
if
(
ver
<
pWal
->
firstVersion
)
{
//TODO: try to seek pruned log
return
-
1
;
}
if
(
ver
<
pWal
->
snapshotVersion
)
{
//TODO: seek snapshotted log
}
if
(
ver
<
pWal
->
curFileFirstVersion
||
(
pWal
->
curFileLastVersion
!=
-
1
&&
ver
>
pWal
->
curFileLastVersion
))
{
//back up to avoid inconsistency
int64_t
curVersion
=
pWal
->
curVersion
;
int64_t
curOffset
=
pWal
->
curOffset
;
int64_t
curFileFirstVersion
=
pWal
->
curFileFirstVersion
;
int64_t
curFileLastVersion
=
pWal
->
curFileLastVersion
;
if
(
walSetCurVerImpl
(
pWal
,
ver
)
<
0
)
{
//TODO: errno
pWal
->
curVersion
=
curVersion
;
pWal
->
curOffset
=
curOffset
;
pWal
->
curFileFirstVersion
=
curFileFirstVersion
;
pWal
->
curFileLastVersion
=
curFileLastVersion
;
return
-
1
;
}
}
return
0
;
}
int
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
int
code
=
0
;
//get index file
if
(
!
tfValid
(
pWal
->
curIdxTfd
))
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".idx, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
}
if
(
pWal
->
curVersion
!=
ver
)
{
if
(
walSetCurVer
(
pWal
,
ver
)
!=
0
)
{
//TODO: some records are skipped
return
-
1
;
}
}
//check file checksum
//append index
return
0
;
}
int
walRotateIndex
(
SWal
*
pWal
)
{
//check file checksum
//create new file
//switch file
return
0
;
}
source/libs/wal/src/walMgmt.c
浏览文件 @
77657f5c
...
...
@@ -21,7 +21,7 @@
#include "walInt.h"
typedef
struct
{
int32_t
refId
;
int32_t
ref
Set
Id
;
int32_t
seq
;
int8_t
stop
;
pthread_t
thread
;
...
...
@@ -36,7 +36,7 @@ static void walFreeObj(void *pWal);
int32_t
walInit
()
{
int32_t
code
=
0
;
tsWal
.
refId
=
taosOpenRef
(
TSDB_MIN_VNODES
,
walFreeObj
);
tsWal
.
ref
Set
Id
=
taosOpenRef
(
TSDB_MIN_VNODES
,
walFreeObj
);
code
=
pthread_mutex_init
(
&
tsWal
.
mutex
,
NULL
);
if
(
code
)
{
...
...
@@ -45,23 +45,23 @@ int32_t walInit() {
}
code
=
walCreateThread
();
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
0
)
{
wError
(
"failed to init wal module since %s"
,
tstrerror
(
code
));
return
code
;
}
wInfo
(
"wal module is initialized, rsetId:%d"
,
tsWal
.
refId
);
wInfo
(
"wal module is initialized, rsetId:%d"
,
tsWal
.
ref
Set
Id
);
return
code
;
}
void
walCleanUp
()
{
walStopThread
();
taosCloseRef
(
tsWal
.
refId
);
taosCloseRef
(
tsWal
.
ref
Set
Id
);
pthread_mutex_destroy
(
&
tsWal
.
mutex
);
wInfo
(
"wal module is cleaned up"
);
}
SWal
*
walOpen
(
char
*
path
,
SWalCfg
*
pCfg
)
{
SWal
*
walOpen
(
c
onst
c
har
*
path
,
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
malloc
(
sizeof
(
SWal
));
if
(
pWal
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -69,10 +69,9 @@ SWal *walOpen(char *path, SWalCfg *pCfg) {
}
pWal
->
vgId
=
pCfg
->
vgId
;
pWal
->
t
fd
=
-
1
;
pWal
->
fileId
=
-
1
;
pWal
->
curLogT
fd
=
-
1
;
/*pWal->curFileId = -1;*/
pWal
->
level
=
pCfg
->
walLevel
;
/*pWal->keep = pCfg->keep;*/
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
pWal
->
path
));
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
);
...
...
@@ -80,13 +79,13 @@ SWal *walOpen(char *path, SWalCfg *pCfg) {
pWal
->
fsyncSeq
=
pCfg
->
fsyncPeriod
/
1000
;
if
(
pWal
->
fsyncSeq
<=
0
)
pWal
->
fsyncSeq
=
1
;
if
(
walInitObj
(
pWal
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
walInitObj
(
pWal
)
!=
0
)
{
walFreeObj
(
pWal
);
return
NULL
;
}
pWal
->
r
Id
=
taosAddRef
(
tsWal
.
ref
Id
,
pWal
);
if
(
pWal
->
rId
<
0
)
{
pWal
->
r
efId
=
taosAddRef
(
tsWal
.
refSet
Id
,
pWal
);
if
(
pWal
->
r
ef
Id
<
0
)
{
walFreeObj
(
pWal
);
return
NULL
;
}
...
...
@@ -102,7 +101,7 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
if
(
pWal
->
level
==
pCfg
->
walLevel
&&
pWal
->
fsyncPeriod
==
pCfg
->
fsyncPeriod
)
{
wDebug
(
"vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncPeriod
,
pCfg
->
walLevel
,
pCfg
->
fsyncPeriod
);
return
TSDB_CODE_SUCCESS
;
return
0
;
}
wInfo
(
"vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d"
,
pWal
->
vgId
,
pWal
->
level
,
...
...
@@ -113,26 +112,16 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
pWal
->
fsyncSeq
=
pCfg
->
fsyncPeriod
/
1000
;
if
(
pWal
->
fsyncSeq
<=
0
)
pWal
->
fsyncSeq
=
1
;
return
TSDB_CODE_SUCCESS
;
}
void
walStop
(
void
*
handle
)
{
if
(
handle
==
NULL
)
return
;
SWal
*
pWal
=
handle
;
pthread_mutex_lock
(
&
pWal
->
mutex
);
pWal
->
stop
=
1
;
pthread_mutex_unlock
(
&
pWal
->
mutex
);
wDebug
(
"vgId:%d, stop write wal"
,
pWal
->
vgId
);
return
0
;
}
void
walClose
(
SWal
*
pWal
)
{
if
(
pWal
==
NULL
)
return
;
pthread_mutex_lock
(
&
pWal
->
mutex
);
tfClose
(
pWal
->
t
fd
);
tfClose
(
pWal
->
curLogT
fd
);
pthread_mutex_unlock
(
&
pWal
->
mutex
);
taosRemoveRef
(
tsWal
.
ref
Id
,
pWal
->
r
Id
);
taosRemoveRef
(
tsWal
.
ref
SetId
,
pWal
->
ref
Id
);
}
static
int32_t
walInitObj
(
SWal
*
pWal
)
{
...
...
@@ -142,14 +131,14 @@ static int32_t walInitObj(SWal *pWal) {
}
wDebug
(
"vgId:%d, object is initialized"
,
pWal
->
vgId
);
return
TSDB_CODE_SUCCESS
;
return
0
;
}
static
void
walFreeObj
(
void
*
wal
)
{
SWal
*
pWal
=
wal
;
wDebug
(
"vgId:%d, wal:%p is freed"
,
pWal
->
vgId
,
pWal
);
tfClose
(
pWal
->
t
fd
);
tfClose
(
pWal
->
curLogT
fd
);
pthread_mutex_destroy
(
&
pWal
->
mutex
);
tfree
(
pWal
);
}
...
...
@@ -174,16 +163,16 @@ static void walUpdateSeq() {
}
static
void
walFsyncAll
()
{
SWal
*
pWal
=
taosIterateRef
(
tsWal
.
refId
,
0
);
SWal
*
pWal
=
taosIterateRef
(
tsWal
.
ref
Set
Id
,
0
);
while
(
pWal
)
{
if
(
walNeedFsync
(
pWal
))
{
wTrace
(
"vgId:%d, do fsync, level:%d seq:%d rseq:%d"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncSeq
,
tsWal
.
seq
);
int32_t
code
=
tfFsync
(
pWal
->
t
fd
);
int32_t
code
=
tfFsync
(
pWal
->
curLogT
fd
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, file:%
s, failed to fsync since %s"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
code
));
wError
(
"vgId:%d, file:%
"
PRId64
".log, failed to fsync since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
code
));
}
}
pWal
=
taosIterateRef
(
tsWal
.
ref
Id
,
pWal
->
r
Id
);
pWal
=
taosIterateRef
(
tsWal
.
ref
SetId
,
pWal
->
ref
Id
);
}
}
...
...
@@ -216,7 +205,7 @@ static int32_t walCreateThread() {
pthread_attr_destroy
(
&
thAttr
);
wDebug
(
"wal thread is launched, thread:0x%08"
PRIx64
,
taosGetPthreadId
(
tsWal
.
thread
));
return
TSDB_CODE_SUCCESS
;
return
0
;
}
static
void
walStopThread
()
{
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
77657f5c
...
...
@@ -21,6 +21,7 @@
#include "tfile.h"
#include "walInt.h"
#if 0
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
int32_t walRenew(void *handle) {
...
...
@@ -29,16 +30,16 @@ int32_t walRenew(void *handle) {
SWal * pWal = handle;
int32_t code = 0;
if
(
pWal
->
stop
)
{
wDebug
(
"vgId:%d, do not create a new wal file"
,
pWal
->
vgId
);
return
0
;
}
/*if (pWal->stop) {*/
/*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/
/*return 0;*/
/*}*/
pthread_mutex_lock(&pWal->mutex);
if
(
tfValid
(
pWal
->
t
fd
))
{
tfClose
(
pWal
->
t
fd
);
wDebug
(
"vgId:%d, file:%s, it is closed while renew"
,
pWal
->
vgId
,
pWal
->
n
ame
);
if (tfValid(pWal->
logT
fd)) {
tfClose(pWal->
logT
fd);
wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->
logN
ame);
}
/*if (pWal->keep == TAOS_WAL_KEEP) {*/
...
...
@@ -48,14 +49,14 @@ int32_t walRenew(void *handle) {
/*pWal->fileId++;*/
/*}*/
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
name
),
"%s/%s%"
PRId64
,
pWal
->
path
,
WAL_PREFIX
,
pWal
->
f
ileId
);
pWal
->
tfd
=
tfOpenCreateWrite
(
pWal
->
n
ame
);
snprintf(pWal->
logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curF
ileId);
pWal->
logTfd = tfOpenCreateWrite(pWal->logN
ame);
if
(
!
tfValid
(
pWal
->
t
fd
))
{
if (!tfValid(pWal->
logT
fd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError
(
"vgId:%d, file:%s, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
n
ame
,
strerror
(
errno
));
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->
logN
ame, strerror(errno));
} else {
wDebug
(
"vgId:%d, file:%s, it is created and open while renew"
,
pWal
->
vgId
,
pWal
->
n
ame
);
wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->
logN
ame);
}
pthread_mutex_unlock(&pWal->mutex);
...
...
@@ -67,13 +68,13 @@ void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
/*if (pWal->keep == TAOS_WAL_KEEP) return;*/
if
(
!
tfValid
(
pWal
->
t
fd
))
return
;
if (!tfValid(pWal->
logT
fd)) return;
pthread_mutex_lock(&pWal->mutex);
// remove the oldest wal file
int64_t oldFileId = -1;
if
(
walGetOldFile
(
pWal
,
pWal
->
f
ileId
,
WAL_FILE_NUM
,
&
oldFileId
)
==
0
)
{
if (walGetOldFile(pWal, pWal->
curF
ileId, WAL_FILE_NUM, &oldFileId) == 0) {
char walName[WAL_FILE_LEN] = {0};
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
...
...
@@ -95,26 +96,24 @@ void walRemoveAllOldFiles(void *handle) {
pthread_mutex_lock(&pWal->mutex);
tfClose
(
pWal
->
t
fd
);
wDebug
(
"vgId:%d, file:%s, it is closed before remove all wals"
,
pWal
->
vgId
,
pWal
->
n
ame
);
tfClose(pWal->
logT
fd);
wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->
logN
ame);
while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
n
ame
),
"%s/%s%"
PRId64
,
pWal
->
path
,
WAL_PREFIX
,
fileId
);
snprintf(pWal->
logName, sizeof(pWal->logN
ame), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
if
(
remove
(
pWal
->
n
ame
)
<
0
)
{
wError
(
"vgId:%d, wal:%p file:%s, failed to remove since %s"
,
pWal
->
vgId
,
pWal
,
pWal
->
n
ame
,
strerror
(
errno
));
if (remove(pWal->
logN
ame) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->
logN
ame, strerror(errno));
} else {
wInfo
(
"vgId:%d, wal:%p file:%s, it is removed"
,
pWal
->
vgId
,
pWal
,
pWal
->
n
ame
);
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->
logN
ame);
}
}
pthread_mutex_unlock(&pWal->mutex);
}
#if defined(WAL_CHECKSUM_WHOLE)
#endif
static
void
walUpdateChecksum
(
SWalHead
*
pHead
)
{
pHead
->
sver
=
2
;
pHead
->
cksum
=
0
;
pHead
->
cksum
=
taosCalcChecksum
(
0
,
(
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)
+
pHead
->
len
);
}
...
...
@@ -130,8 +129,6 @@ static int walValidateChecksum(SWalHead *pHead) {
return
0
;
}
#endif
int64_t
walWrite
(
SWal
*
pWal
,
int64_t
index
,
void
*
body
,
int32_t
bodyLen
)
{
if
(
pWal
==
NULL
)
return
-
1
;
...
...
@@ -143,32 +140,27 @@ int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
int32_t
code
=
0
;
// no wal
if
(
!
tfValid
(
pWal
->
t
fd
))
return
0
;
if
(
!
tfValid
(
pWal
->
curLogT
fd
))
return
0
;
if
(
pWal
->
level
==
TAOS_WAL_NOLOG
)
return
0
;
if
(
pHead
->
version
<=
pWal
->
v
ersion
)
return
0
;
if
(
pHead
->
version
<=
pWal
->
curV
ersion
)
return
0
;
pHead
->
signature
=
WAL_SIGNATURE
;
pHead
->
len
=
bodyLen
;
memcpy
(
pHead
->
cont
,
body
,
bodyLen
);
#if defined(WAL_CHECKSUM_WHOLE)
walUpdateChecksum
(
pHead
);
#else
pHead
->
sver
=
0
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
));
#endif
int32_t
contLen
=
pHead
->
len
+
sizeof
(
SWalHead
);
pthread_mutex_lock
(
&
pWal
->
mutex
);
if
(
tfWrite
(
pWal
->
t
fd
,
pHead
,
contLen
)
!=
contLen
)
{
if
(
tfWrite
(
pWal
->
curLogT
fd
,
pHead
,
contLen
)
!=
contLen
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%
s, failed to write since %s"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%
"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
}
else
{
wTrace
(
"vgId:%d, write wal, fileId:%"
PRId64
" tfd:%"
PRId64
" hver:%"
PRId64
" wver:%"
PRIu64
" len:%d"
,
pWal
->
vgId
,
pWal
->
fileId
,
pWal
->
tfd
,
pHead
->
version
,
pWal
->
version
,
pHead
->
len
);
pWal
->
v
ersion
=
pHead
->
version
;
/*wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,*/
/*pWal->curFileId, pWal->logTfd, pHead->version, pWal->curVersion, pHead->len);*/
pWal
->
curV
ersion
=
pHead
->
version
;
}
pthread_mutex_unlock
(
&
pWal
->
mutex
);
...
...
@@ -179,16 +171,17 @@ int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
}
void
walFsync
(
SWal
*
pWal
,
bool
forceFsync
)
{
if
(
pWal
==
NULL
||
!
tfValid
(
pWal
->
t
fd
))
return
;
if
(
pWal
==
NULL
||
!
tfValid
(
pWal
->
curLogT
fd
))
return
;
if
(
forceFsync
||
(
pWal
->
level
==
TAOS_WAL_FSYNC
&&
pWal
->
fsyncPeriod
==
0
))
{
wTrace
(
"vgId:%d, fileId:%"
PRId64
", do fsync"
,
pWal
->
vgId
,
pWal
->
fileId
);
if
(
tfFsync
(
pWal
->
t
fd
)
<
0
)
{
wError
(
"vgId:%d, file
Id:%"
PRId64
", fsync failed since %s"
,
pWal
->
vgId
,
pWal
->
fileId
,
strerror
(
errno
));
wTrace
(
"vgId:%d, fileId:%"
PRId64
".log, do fsync"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
);
if
(
tfFsync
(
pWal
->
curLogT
fd
)
<
0
)
{
wError
(
"vgId:%d, file
:%"
PRId64
".log, fsync failed since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
}
}
}
#if 0
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
if (handle == NULL) return -1;
...
...
@@ -198,10 +191,10 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
int64_t fileId = -1;
while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
if
(
fileId
==
pWal
->
fileId
)
continue
;
/*if (fileId == pWal->curFileId) continue;*/
char walName[WAL_FILE_LEN];
snprintf
(
walName
,
sizeof
(
pWal
->
n
ame
),
"%s/%s%"
PRId64
,
pWal
->
path
,
WAL_PREFIX
,
fileId
);
snprintf(walName, sizeof(pWal->
logN
ame), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
...
...
@@ -210,7 +203,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
continue;
}
wInfo
(
"vgId:%d, file:%s, restore success, wver:%"
PRIu64
,
pWal
->
vgId
,
walName
,
pWal
->
v
ersion
);
wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->
curV
ersion);
count++;
}
...
...
@@ -222,14 +215,14 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
return walRenew(pWal);
} else {
// open the existing WAL file in append mode
pWal
->
fileId
=
0
;
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
name
),
"%s/%s%"
PRId64
,
pWal
->
path
,
WAL_PREFIX
,
pWal
->
f
ileId
);
pWal
->
tfd
=
tfOpenCreateWriteAppend
(
pWal
->
n
ame
);
if
(
!
tfValid
(
pWal
->
t
fd
))
{
wError
(
"vgId:%d, file:%s, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
n
ame
,
strerror
(
errno
));
/*pWal->curFileId = 0;*/
snprintf(pWal->
logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curF
ileId);
pWal->
logTfd = tfOpenCreateWriteAppend(pWal->logN
ame);
if (!tfValid(pWal->
logT
fd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->
logN
ame, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
wDebug
(
"vgId:%d, file:%s, it is created and open while restore"
,
pWal
->
vgId
,
pWal
->
n
ame
);
wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->
logN
ame);
}
return TSDB_CODE_SUCCESS;
...
...
@@ -246,14 +239,15 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
int32_t code = walGetNextFile(pWal, fileId);
if (code >= 0) {
sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
code
=
(
*
fileId
==
pWal
->
fileId
)
?
0
:
1
;
/*code = (*fileId == pWal->curFileId) ? 0 : 1;*/
}
wDebug
(
"vgId:%d, get wal file, code:%d curId:%"
PRId64
" outId:%"
PRId64
,
pWal
->
vgId
,
code
,
pWal
->
f
ileId
,
*
fileId
);
wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->
curF
ileId, *fileId);
pthread_mutex_unlock(&(pWal->mutex));
return code;
}
#endif
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
tfd
,
int64_t
offset
)
{
tfFtruncate
(
tfd
,
offset
);
...
...
@@ -279,13 +273,6 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
continue
;
}
#if defined(WAL_CHECKSUM_WHOLE)
if
(
pHead
->
sver
==
0
&&
walValidateChecksum
(
pHead
))
{
wInfo
(
"vgId:%d, wal head cksum check passed, offset:%"
PRId64
,
pWal
->
vgId
,
pos
);
*
offset
=
pos
;
return
TSDB_CODE_SUCCESS
;
}
if
(
pHead
->
sver
>=
1
)
{
if
(
tfRead
(
tfd
,
pHead
->
cont
,
pHead
->
len
)
<
pHead
->
len
)
{
wError
(
"vgId:%d, read to end of corrupted wal file, offset:%"
PRId64
,
pWal
->
vgId
,
pos
);
...
...
@@ -298,15 +285,6 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
return
TSDB_CODE_SUCCESS
;
}
}
#else
if
(
taosCheckChecksumWhole
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)))
{
wInfo
(
"vgId:%d, wal head cksum check passed, offset:%"
PRId64
,
pWal
->
vgId
,
pos
);
*
offset
=
pos
;
return
TSDB_CODE_SUCCESS
;
}
#endif
}
return
TSDB_CODE_WAL_FILE_CORRUPTED
;
...
...
@@ -349,7 +327,6 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
break
;
}
#if defined(WAL_CHECKSUM_WHOLE)
if
((
pHead
->
sver
==
0
&&
!
walValidateChecksum
(
pHead
))
||
pHead
->
sver
<
0
||
pHead
->
sver
>
2
)
{
wError
(
"vgId:%d, file:%s, wal head cksum is messed up, hver:%"
PRIu64
" len:%d offset:%"
PRId64
,
pWal
->
vgId
,
name
,
pHead
->
version
,
pHead
->
len
,
offset
);
...
...
@@ -393,50 +370,15 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
}
}
#else
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)))
{
wError
(
"vgId:%d, file:%s, wal head cksum is messed up, hver:%"
PRIu64
" len:%d offset:%"
PRId64
,
pWal
->
vgId
,
name
,
pHead
->
version
,
pHead
->
len
,
offset
);
code
=
walSkipCorruptedRecord
(
pWal
,
pHead
,
tfd
,
&
offset
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
walFtruncate
(
pWal
,
tfd
,
offset
);
break
;
}
}
if
(
pHead
->
len
<
0
||
pHead
->
len
>
size
-
sizeof
(
SWalHead
))
{
wError
(
"vgId:%d, file:%s, wal head len out of range, hver:%"
PRIu64
" len:%d offset:%"
PRId64
,
pWal
->
vgId
,
name
,
pHead
->
version
,
pHead
->
len
,
offset
);
code
=
walSkipCorruptedRecord
(
pWal
,
pHead
,
tfd
,
&
offset
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
walFtruncate
(
pWal
,
tfd
,
offset
);
break
;
}
}
ret
=
(
int32_t
)
tfRead
(
tfd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
<
0
)
{
wError
(
"vgId:%d, file:%s, failed to read wal body since %s"
,
pWal
->
vgId
,
name
,
strerror
(
errno
));
code
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
if
(
ret
<
pHead
->
len
)
{
wError
(
"vgId:%d, file:%s, failed to read wal body, ret:%d len:%d"
,
pWal
->
vgId
,
name
,
ret
,
pHead
->
len
);
offset
+=
sizeof
(
SWalHead
);
continue
;
}
#endif
offset
=
offset
+
sizeof
(
SWalHead
)
+
pHead
->
len
;
wTrace
(
"vgId:%d, restore wal, fileId:%"
PRId64
" hver:%"
PRIu64
" wver:%"
PRIu64
" len:%d offset:%"
PRId64
,
pWal
->
vgId
,
fileId
,
pHead
->
version
,
pWal
->
v
ersion
,
pHead
->
len
,
offset
);
pWal
->
vgId
,
fileId
,
pHead
->
version
,
pWal
->
curV
ersion
,
pHead
->
len
,
offset
);
pWal
->
v
ersion
=
pHead
->
version
;
pWal
->
curV
ersion
=
pHead
->
version
;
// wInfo("writeFp: %ld", offset);
(
*
writeFp
)(
pVnode
,
pHead
,
NULL
);
(
*
writeFp
)(
pVnode
,
pHead
);
}
tfClose
(
tfd
);
...
...
@@ -449,7 +391,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
uint64_t
walGetVersion
(
SWal
*
pWal
)
{
if
(
pWal
==
NULL
)
return
0
;
return
pWal
->
v
ersion
;
return
pWal
->
curV
ersion
;
}
// Wal version in slave (dnode1) must be reset.
...
...
@@ -459,7 +401,7 @@ uint64_t walGetVersion(SWal *pWal) {
void
walResetVersion
(
SWal
*
pWal
,
uint64_t
newVer
)
{
if
(
pWal
==
NULL
)
return
;
wInfo
(
"vgId:%d, version reset from %"
PRIu64
" to %"
PRIu64
,
pWal
->
vgId
,
pWal
->
v
ersion
,
newVer
);
wInfo
(
"vgId:%d, version reset from %"
PRIu64
" to %"
PRIu64
,
pWal
->
vgId
,
pWal
->
curV
ersion
,
newVer
);
pWal
->
v
ersion
=
newVer
;
pWal
->
curV
ersion
=
newVer
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录