Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
d0e60fde
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d0e60fde
编写于
10月 29, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1856 TD-1847
上级
43edc037
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
387 addition
and
309 deletion
+387
-309
src/inc/twal.h
src/inc/twal.h
+13
-6
src/util/inc/talloc.h
src/util/inc/talloc.h
+17
-2
src/util/src/talloc.c
src/util/src/talloc.c
+66
-0
src/wal/inc/walInt.h
src/wal/inc/walInt.h
+20
-0
src/wal/src/walMgmt.c
src/wal/src/walMgmt.c
+220
-1
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+51
-300
未找到文件。
src/inc/twal.h
浏览文件 @
d0e60fde
...
...
@@ -19,9 +19,11 @@
extern
"C"
{
#endif
#define TAOS_WAL_NOLOG 0
#define TAOS_WAL_WRITE 1
#define TAOS_WAL_FSYNC 2
typedef
enum
{
TAOS_WAL_NOLOG
=
0
,
TAOS_WAL_WRITE
=
1
,
TAOS_WAL_FSYNC
=
2
}
EWalType
;
typedef
struct
{
int8_t
msgType
;
...
...
@@ -34,17 +36,22 @@ typedef struct {
}
SWalHead
;
typedef
struct
{
int
8_t
walLevel
;
// wal level
int
32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
int8_t
walLevel
;
// wal level
int8_t
wals
;
// number of WAL files;
int8_t
keep
;
// keep the wal file when closed
int8_t
reserved
[
5
];
}
SWalCfg
;
typedef
void
*
twalh
;
// WAL HANDLE
typedef
int
(
*
FWalWrite
)(
void
*
ahandle
,
void
*
pHead
,
int
type
);
twalh
walOpen
(
const
char
*
path
,
const
SWalCfg
*
pCfg
);
int
walAlter
(
twalh
pWal
,
const
SWalCfg
*
pCfg
);
int32_t
walInit
();
void
walCleanUp
();
twalh
walOpen
(
char
*
path
,
SWalCfg
*
pCfg
);
int
walAlter
(
twalh
pWal
,
SWalCfg
*
pCfg
);
void
walClose
(
twalh
);
int
walRenew
(
twalh
);
int
walWrite
(
twalh
,
SWalHead
*
);
...
...
src/
wal/inc/walMgmt
.h
→
src/
util/inc/talloc
.h
浏览文件 @
d0e60fde
...
...
@@ -13,13 +13,28 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
WAL_MGMT
_H
#define TDENGINE_
WAL_MGMT
_H
#ifndef TDENGINE_
UTIL_ALLOC
_H
#define TDENGINE_
UTIL_ALLOC
_H
#ifdef __cplusplus
extern
"C"
{
#endif
#ifdef TSDB_USE_SYS_MEM
#define tmalloc(size) malloc(size)
#define tcalloc(size) calloc(1, size)
#define tmemalign(alignment, size) malloc(size)
#define tfree(p) free(p)
#define tmemzero(p, size) memset(p, 0, size)
#else
void
*
tmalloc
(
int32_t
size
);
void
*
tcalloc
(
int32_t
size
);
void
*
tmemalign
(
int32_t
alignment
,
int32_t
size
);
void
tfree
(
void
*
p
);
void
tmemzero
(
void
*
p
,
int32_t
size
);
#endif
#ifdef __cplusplus
}
#endif
...
...
src/util/src/talloc.c
0 → 100644
浏览文件 @
d0e60fde
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#define TSDB_HAVE_MEMALIGN
void
*
tmalloc
(
int32_t
size
)
{
void
*
p
=
malloc
(
size
);
if
(
p
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to malloc memory, size:%d reason:%s"
,
size
,
strerror
(
errno
));
}
return
p
;
}
void
*
tcalloc
(
int32_t
size
)
{
void
*
p
=
calloc
(
1
,
size
);
if
(
p
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to calloc memory, size:%d reason:%s"
,
size
,
strerror
(
errno
));
}
return
p
;
}
void
tfree
(
void
*
p
)
{
free
(
p
);
}
void
tmemzero
(
void
*
p
,
int32_t
size
)
{
memset
(
p
,
0
,
size
);
}
#ifdef TSDB_HAVE_MEMALIGN
void
*
tmemalign
(
int32_t
alignment
,
int32_t
size
)
{
void
*
p
;
int
err
=
posix_memalign
(
&
p
,
alignment
,
size
);
if
(
err
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
uError
(
"failed to memalign memory, alignment:%d size:%d reason:%s"
,
alignment
,
size
,
strerror
(
err
));
p
=
NULL
;
}
return
p
;
}
#else
void
*
tmemalign
(
int32_t
alignment
,
int32_t
size
)
{
return
tmalloc
(
size
);
}
#endif
src/wal/inc/walInt.h
浏览文件 @
d0e60fde
...
...
@@ -32,8 +32,28 @@ extern int32_t wDebugFlag;
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define walPrefix "wal"
#define walRefreshIntervalMs 1000
#define walSignature (uint32_t)(0xFAFBFDFE)
typedef
struct
{
uint64_t
version
;
int32_t
vgId
;
int32_t
fd
;
int32_t
keep
;
int32_t
level
;
int32_t
fsyncPeriod
;
int32_t
fsyncSeq
;
int32_t
fileIndex
;
void
*
timer
;
void
*
signature
;
int
max
;
// maximum number of wal files
uint32_t
id
;
// increase continuously
int
num
;
// number of wal files
char
path
[
TSDB_FILENAME_LEN
];
char
name
[
TSDB_FILENAME_LEN
+
16
];
pthread_mutex_t
mutex
;
}
SWal
;
#ifdef __cplusplus
}
#endif
...
...
src/wal/src/walMgmt.c
浏览文件 @
d0e60fde
...
...
@@ -16,6 +16,225 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "talloc.h"
#include "tref.h"
#include "tutil.h"
#include "twal.h"
#include "walInt.h"
#include "walMgmt.h"
\ No newline at end of file
typedef
struct
{
int32_t
refId
;
int32_t
num
;
int32_t
seq
;
int8_t
stop
;
int8_t
reserved
[
3
];
pthread_t
thread
;
pthread_mutex_t
mutex
;
}
SWalMgmt
;
static
SWalMgmt
tsWal
;
static
int32_t
walCreateThread
();
static
void
walStopThread
();
static
int32_t
walInitObj
(
SWal
*
pWal
);
static
void
walFreeObj
(
void
*
pWal
);
int32_t
walInit
()
{
tmemzero
(
&
tsWal
,
sizeof
(
SWalMgmt
));
tsWal
.
refId
=
taosOpenRef
(
TSDB_MIN_VNODES
,
walFreeObj
);
int32_t
code
=
walCreateThread
();
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
wError
(
"failed to init wal module, reason:%s"
,
tstrerror
(
code
));
return
code
;
}
wInfo
(
"wal module is initialized"
);
return
code
;
}
void
walCleanUp
()
{
walStopThread
();
taosCloseRef
(
tsWal
.
refId
);
wInfo
(
"wal module is cleaned up"
);
}
void
*
walOpen
(
char
*
path
,
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
tcalloc
(
sizeof
(
SWal
));
if
(
pWal
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
pWal
->
fd
=
-
1
;
pWal
->
max
=
pCfg
->
wals
;
pWal
->
id
=
0
;
pWal
->
num
=
0
;
pWal
->
level
=
pCfg
->
walLevel
;
pWal
->
keep
=
pCfg
->
keep
;
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
pWal
->
signature
=
pWal
;
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
path
));
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
);
pWal
->
fsyncSeq
=
pCfg
->
fsyncPeriod
%
1000
;
if
(
pWal
->
fsyncSeq
<=
0
)
pWal
->
fsyncSeq
=
1
;
if
(
walInitObj
(
pWal
)
!=
TSDB_CODE_SUCCESS
)
{
walFreeObj
(
pWal
);
return
NULL
;
}
if
(
taosAddRef
(
tsWal
.
refId
,
pWal
)
!=
TSDB_CODE_SUCCESS
)
{
walFreeObj
(
pWal
);
return
NULL
;
}
atomic_add_fetch_32
(
&
tsWal
.
num
,
1
);
wDebug
(
"vgId:%d, wal is opened, level:%d period:%d path:%s"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncPeriod
,
pWal
->
path
);
return
pWal
;
}
int32_t
walAlter
(
void
*
handle
,
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
handle
;
if
(
pWal
->
level
==
pCfg
->
walLevel
&&
pWal
->
fsyncPeriod
==
pCfg
->
fsyncPeriod
)
{
wDebug
(
"vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncPeriod
,
pCfg
->
walLevel
,
pCfg
->
fsyncPeriod
);
return
TSDB_CODE_SUCCESS
;
}
wInfo
(
"vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncPeriod
,
pCfg
->
walLevel
,
pCfg
->
fsyncPeriod
);
pWal
->
level
=
pCfg
->
walLevel
;
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
return
TSDB_CODE_SUCCESS
;
}
void
walClose
(
void
*
handle
)
{
SWal
*
pWal
=
handle
;
taosClose
(
pWal
->
fd
);
if
(
pWal
->
keep
==
0
)
{
// remove all files in the directory
for
(
int32_t
i
=
0
;
i
<
pWal
->
num
;
++
i
)
{
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
name
),
"%s/%s%d"
,
pWal
->
path
,
walPrefix
,
pWal
->
id
-
i
);
if
(
remove
(
pWal
->
name
)
<
0
)
{
wError
(
"vgId:%d, wal:%s, failed to remove"
,
pWal
->
vgId
,
pWal
->
name
);
}
else
{
wDebug
(
"vgId:%d, wal:%s, it is removed"
,
pWal
->
vgId
,
pWal
->
name
);
}
}
}
else
{
wDebug
(
"vgId:%d, wal:%s, it is closed and kept"
,
pWal
->
vgId
,
pWal
->
name
);
}
taosRemoveRef
(
tsWal
.
refId
,
pWal
);
}
static
int32_t
walInitObj
(
SWal
*
pWal
)
{
if
(
taosMkDir
(
pWal
->
path
,
0755
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, wal:%s, failed to create directory, reason:%s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
terrno
;
}
if
(
pWal
->
keep
==
1
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
pWal
&&
pWal
->
fd
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, wal:%s, failed to open file, reason:%s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
terrno
;
}
wDebug
(
"vgId:%d, wal:%s, is initialized"
,
pWal
->
vgId
,
pWal
->
name
);
return
TSDB_CODE_SUCCESS
;
}
static
void
walFreeObj
(
void
*
wal
)
{
SWal
*
pWal
=
pWal
;
wDebug
(
"vgId:%d, wal is freed"
,
pWal
->
vgId
);
taosClose
(
pWal
->
fd
);
pthread_mutex_destroy
(
&
pWal
->
mutex
);
tfree
(
pWal
);
}
// static bool walNeedFsync(SWal *pWal) {
// if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) {
// return false;
// }
// if (tsWal.seq % pWal->fsyncSeq == 0) {
// return true;
// }
// return false;
// }
static
void
walUpdateSeq
()
{
taosMsleep
(
walRefreshIntervalMs
);
if
(
++
tsWal
.
seq
<=
0
)
{
tsWal
.
seq
=
1
;
}
}
static
void
walFsyncAll
()
{
// int32_t code;
// void * pIter = taosRefCreateIter(tsWal.refId);
// while (taosRefIterNext(pIter)) {
// SWal *pWal = taosRefIterGet(pIter);
// if (pWal == NULL) break;
// if (!walNeedFsync(pWal)) {
// wTrace("wal:%s, do fsync, level:%d seq:%d rseq:%d", pWal->name, pWal->level, pWal->fsyncSeq, tsWal.refreshSeq);
// code = walFsync(pWal);
// if (code != TSDB_CODE_SUCCESS) {
// wError("wal:%s, fsync failed(%s)", pWal->name, strerror(code));
// }
// }
// taosReleaseRef(pWal);
// }
// taosRefDestroyIter(pIter);
}
static
void
*
walThreadFunc
(
void
*
param
)
{
while
(
1
)
{
walUpdateSeq
();
walFsyncAll
();
if
(
tsWal
.
stop
)
break
;
}
return
NULL
;
}
static
int32_t
walCreateThread
()
{
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
tsWal
.
thread
,
&
thAttr
,
walThreadFunc
,
NULL
)
!=
0
)
{
wError
(
"failed to create wal thread, reason:%s"
,
strerror
(
errno
));
return
TAOS_SYSTEM_ERROR
(
errno
);
}
pthread_attr_destroy
(
&
thAttr
);
wDebug
(
"wal thread is launched"
);
return
TSDB_CODE_SUCCESS
;
}
static
void
walStopThread
()
{
if
(
tsWal
.
thread
)
{
pthread_join
(
tsWal
.
thread
,
NULL
);
}
wDebug
(
"wal thread is stopped"
);
}
src/wal/src/walWrite.c
浏览文件 @
d0e60fde
...
...
@@ -15,171 +15,18 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "twal.h"
#include "walInt.h"
#include "walMgmt.h"
#include "talloc.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tutil.h"
#include "ttimer.h"
#include "taoserror.h"
#include "twal.h"
#include "tqueue.h"
#include "twal.h"
#include "walInt.h"
static
int32_t
walRestoreWalFile
(
SWal
*
pWal
,
void
*
pVnode
,
FWalWrite
writeFp
);
static
int32_t
walRemoveWalFiles
(
const
char
*
path
);
typedef
struct
{
uint64_t
version
;
int
fd
;
int
keep
;
int
level
;
int32_t
fsyncPeriod
;
void
*
timer
;
void
*
signature
;
int
max
;
// maximum number of wal files
uint32_t
id
;
// increase continuously
int
num
;
// number of wal files
char
path
[
TSDB_FILENAME_LEN
];
char
name
[
TSDB_FILENAME_LEN
+
16
];
pthread_mutex_t
mutex
;
}
SWal
;
static
void
*
walTmrCtrl
=
NULL
;
static
int
tsWalNum
=
0
;
static
pthread_once_t
walModuleInit
=
PTHREAD_ONCE_INIT
;
static
int
walHandleExistingFiles
(
const
char
*
path
);
static
int
walRestoreWalFile
(
SWal
*
pWal
,
void
*
pVnode
,
FWalWrite
writeFp
);
static
int
walRemoveWalFiles
(
const
char
*
path
);
static
void
walProcessFsyncTimer
(
void
*
param
,
void
*
tmrId
);
static
void
walRelease
(
SWal
*
pWal
);
static
int
walGetMaxOldFileId
(
char
*
odir
);
static
void
walModuleInitFunc
()
{
walTmrCtrl
=
taosTmrInit
(
1000
,
100
,
300000
,
"WAL"
);
if
(
walTmrCtrl
==
NULL
)
walModuleInit
=
PTHREAD_ONCE_INIT
;
else
wDebug
(
"WAL module is initialized"
);
}
static
inline
bool
walNeedFsyncTimer
(
SWal
*
pWal
)
{
if
(
pWal
->
fsyncPeriod
>
0
&&
pWal
->
level
==
TAOS_WAL_FSYNC
)
{
return
true
;
}
return
false
;
}
void
*
walOpen
(
const
char
*
path
,
const
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
calloc
(
sizeof
(
SWal
),
1
);
if
(
pWal
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
pthread_once
(
&
walModuleInit
,
walModuleInitFunc
);
if
(
walTmrCtrl
==
NULL
)
{
free
(
pWal
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
atomic_add_fetch_32
(
&
tsWalNum
,
1
);
pWal
->
fd
=
-
1
;
pWal
->
max
=
pCfg
->
wals
;
pWal
->
id
=
0
;
pWal
->
num
=
0
;
pWal
->
level
=
pCfg
->
walLevel
;
pWal
->
keep
=
pCfg
->
keep
;
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
pWal
->
signature
=
pWal
;
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
pWal
->
path
));
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
);
if
(
walNeedFsyncTimer
(
pWal
))
{
pWal
->
timer
=
taosTmrStart
(
walProcessFsyncTimer
,
pWal
->
fsyncPeriod
,
pWal
,
walTmrCtrl
);
if
(
pWal
->
timer
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
walRelease
(
pWal
);
return
NULL
;
}
}
if
(
taosMkDir
(
path
,
0755
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"wal:%s, failed to create directory(%s)"
,
path
,
strerror
(
errno
));
walRelease
(
pWal
);
pWal
=
NULL
;
}
if
(
pCfg
->
keep
==
1
)
return
pWal
;
if
(
walHandleExistingFiles
(
path
)
==
0
)
walRenew
(
pWal
);
if
(
pWal
&&
pWal
->
fd
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"wal:%s, failed to open(%s)"
,
path
,
strerror
(
errno
));
walRelease
(
pWal
);
pWal
=
NULL
;
}
if
(
pWal
)
wDebug
(
"wal:%s, it is open, level:%d fsyncPeriod:%d"
,
path
,
pWal
->
level
,
pWal
->
fsyncPeriod
);
return
pWal
;
}
int
walAlter
(
twalh
wal
,
const
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
wal
;
if
(
pWal
==
NULL
)
{
return
TSDB_CODE_WAL_APP_ERROR
;
}
if
(
pWal
->
level
==
pCfg
->
walLevel
&&
pWal
->
fsyncPeriod
==
pCfg
->
fsyncPeriod
)
{
wDebug
(
"wal:%s, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change"
,
pWal
->
name
,
pWal
->
level
,
pWal
->
fsyncPeriod
,
pCfg
->
walLevel
,
pCfg
->
fsyncPeriod
);
return
TSDB_CODE_SUCCESS
;
}
wInfo
(
"wal:%s, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d"
,
pWal
->
name
,
pWal
->
level
,
pWal
->
fsyncPeriod
,
pCfg
->
walLevel
,
pCfg
->
fsyncPeriod
);
pthread_mutex_lock
(
&
pWal
->
mutex
);
pWal
->
level
=
pCfg
->
walLevel
;
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
if
(
walNeedFsyncTimer
(
pWal
))
{
wInfo
(
"wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d"
,
pWal
->
name
,
pWal
->
level
,
pWal
->
fsyncPeriod
);
taosTmrReset
(
walProcessFsyncTimer
,
pWal
->
fsyncPeriod
,
pWal
,
&
pWal
->
timer
,
walTmrCtrl
);
}
else
{
wInfo
(
"wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d"
,
pWal
->
name
,
pWal
->
level
,
pWal
->
fsyncPeriod
);
taosTmrStop
(
pWal
->
timer
);
pWal
->
timer
=
NULL
;
}
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
TSDB_CODE_SUCCESS
;
}
void
walClose
(
void
*
handle
)
{
if
(
handle
==
NULL
)
return
;
SWal
*
pWal
=
handle
;
taosClose
(
pWal
->
fd
);
if
(
pWal
->
timer
)
taosTmrStopA
(
&
pWal
->
timer
);
if
(
pWal
->
keep
==
0
)
{
// remove all files in the directory
for
(
int
i
=
0
;
i
<
pWal
->
num
;
++
i
)
{
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
name
),
"%s/%s%d"
,
pWal
->
path
,
walPrefix
,
pWal
->
id
-
i
);
if
(
remove
(
pWal
->
name
)
<
0
)
{
wError
(
"wal:%s, failed to remove"
,
pWal
->
name
);
}
else
{
wDebug
(
"wal:%s, it is removed"
,
pWal
->
name
);
}
}
}
else
{
wDebug
(
"wal:%s, it is closed and kept"
,
pWal
->
name
);
}
walRelease
(
pWal
);
}
int
walRenew
(
void
*
handle
)
{
int32_t
walRenew
(
void
*
handle
)
{
if
(
handle
==
NULL
)
return
0
;
SWal
*
pWal
=
handle
;
...
...
@@ -190,7 +37,7 @@ int walRenew(void *handle) {
if
(
pWal
->
fd
>=
0
)
{
close
(
pWal
->
fd
);
pWal
->
id
++
;
wDebug
(
"
wal:%s, it is closed"
,
pWal
->
name
);
wDebug
(
"
vgId:%d, wal:%s, it is closed"
,
pWal
->
vgId
,
pWal
->
name
);
}
pWal
->
num
++
;
...
...
@@ -199,19 +46,19 @@ int walRenew(void *handle) {
pWal
->
fd
=
open
(
pWal
->
name
,
O_WRONLY
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
pWal
->
fd
<
0
)
{
wError
(
"wal:%s, failed to open(%s)"
,
pWal
->
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, wal:%s, failed to open, reason:%s"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
errno
));
}
else
{
wDebug
(
"
wal:%s, it is created"
,
pWal
->
name
);
wDebug
(
"
vgId:%d, wal:%s, it is created"
,
pWal
->
vgId
,
pWal
->
name
);
if
(
pWal
->
num
>
pWal
->
max
)
{
// remove the oldest wal file
char
name
[
TSDB_FILENAME_LEN
*
3
];
snprintf
(
name
,
sizeof
(
name
),
"%s/%s%d"
,
pWal
->
path
,
walPrefix
,
pWal
->
id
-
pWal
->
max
);
if
(
remove
(
name
)
<
0
)
{
wError
(
"
wal:%s, failed to remove(%s)"
,
name
,
strerror
(
errno
));
wError
(
"
vgId:%d, wal:%s, failed to remove(%s)"
,
pWal
->
vgId
,
name
,
strerror
(
errno
));
}
else
{
wDebug
(
"
wal:%s, it is removed"
,
name
);
wDebug
(
"
vgId:%d, wal:%s, it is removed"
,
pWal
->
vgId
,
name
);
}
pWal
->
num
--
;
...
...
@@ -223,7 +70,7 @@ int walRenew(void *handle) {
return
terrno
;
}
int
walWrite
(
void
*
handle
,
SWalHead
*
pHead
)
{
int
32_t
walWrite
(
void
*
handle
,
SWalHead
*
pHead
)
{
SWal
*
pWal
=
handle
;
if
(
pWal
==
NULL
)
return
-
1
;
...
...
@@ -235,11 +82,11 @@ int walWrite(void *handle, SWalHead *pHead) {
pHead
->
signature
=
walSignature
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
));
int
contLen
=
pHead
->
len
+
sizeof
(
SWalHead
);
int
32_t
contLen
=
pHead
->
len
+
sizeof
(
SWalHead
);
if
(
taosTWrite
(
pWal
->
fd
,
pHead
,
contLen
)
!=
contLen
)
{
wError
(
"wal:%s, failed to write(%s)"
,
pWal
->
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, wal:%s, failed to write(%s)"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
errno
));
return
terrno
;
}
else
{
pWal
->
version
=
pHead
->
version
;
...
...
@@ -255,29 +102,30 @@ void walFsync(void *handle) {
if
(
pWal
->
fsyncPeriod
==
0
)
{
if
(
fsync
(
pWal
->
fd
)
<
0
)
{
wError
(
"
wal:%s, fsync failed(%s)"
,
pWal
->
name
,
strerror
(
errno
));
wError
(
"
vgId:%d, wal:%s, fsync failed(%s)"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
errno
));
}
}
}
int
walRestore
(
void
*
handle
,
void
*
pVnode
,
int
(
*
writeFp
)(
void
*
,
void
*
,
int
))
{
SWal
*
pWal
=
handle
;
struct
dirent
*
ent
;
int
count
=
0
;
uint32_t
maxId
=
0
,
minId
=
-
1
,
index
=
0
;
int32_t
walRestore
(
void
*
handle
,
void
*
pVnode
,
int32_t
(
*
writeFp
)(
void
*
,
void
*
,
int32_t
))
{
SWal
*
pWal
=
handle
;
int32_t
count
=
0
;
uint32_t
maxId
=
0
;
uint32_t
minId
=
-
1
;
uint32_t
index
=
0
;
int32_t
code
=
0
;
struct
dirent
*
ent
;
terrno
=
0
;
int
plen
=
strlen
(
walPrefix
);
int
32_t
plen
=
strlen
(
walPrefix
);
char
opath
[
TSDB_FILENAME_LEN
+
5
];
int
slen
=
snprintf
(
opath
,
sizeof
(
opath
),
"%s"
,
pWal
->
path
);
if
(
pWal
->
keep
==
0
)
strcpy
(
opath
+
slen
,
"/old"
);
snprintf
(
opath
,
sizeof
(
opath
),
"%s"
,
pWal
->
path
);
DIR
*
dir
=
opendir
(
opath
);
if
(
dir
==
NULL
&&
errno
==
ENOENT
)
return
0
;
if
(
dir
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
terrno
;
code
=
TAOS_SYSTEM_ERROR
(
errno
);
return
code
;
}
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
...
...
@@ -290,6 +138,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
}
closedir
(
dir
);
pWal
->
fileIndex
=
maxId
;
if
(
count
==
0
)
{
if
(
pWal
->
keep
)
terrno
=
walRenew
(
pWal
);
...
...
@@ -297,10 +146,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
}
if
(
count
!=
(
maxId
-
minId
+
1
))
{
wError
(
"
wal:%s, messed up, count:%d max:%d min:%d"
,
opath
,
count
,
maxId
,
minId
);
wError
(
"
vgId:%d, wal:%s, messed up, count:%d max:%d min:%d"
,
pWal
->
vgId
,
opath
,
count
,
maxId
,
minId
);
terrno
=
TSDB_CODE_WAL_APP_ERROR
;
}
else
{
wDebug
(
"
wal:%s, %d files will be restored"
,
opath
,
count
);
wDebug
(
"
vgId:%d, wal:%s, %d files will be restored"
,
pWal
->
vgId
,
opath
,
count
);
for
(
index
=
minId
;
index
<=
maxId
;
++
index
)
{
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
name
),
"%s/%s%d"
,
opath
,
walPrefix
,
index
);
...
...
@@ -314,7 +163,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
terrno
=
walRemoveWalFiles
(
opath
);
if
(
terrno
==
0
)
{
if
(
remove
(
opath
)
<
0
)
{
wError
(
"
wal:%s, failed to remove directory(%s)"
,
opath
,
strerror
(
errno
));
wError
(
"
vgId:%d, wal:%s, failed to remove directory, reason:%s"
,
pWal
->
vgId
,
opath
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
}
}
...
...
@@ -325,7 +174,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
name
),
"%s/%s%d"
,
opath
,
walPrefix
,
maxId
);
pWal
->
fd
=
open
(
pWal
->
name
,
O_WRONLY
|
O_CREAT
|
O_APPEND
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
pWal
->
fd
<
0
)
{
wError
(
"
wal:%s, failed to open file(%s)"
,
pWal
->
name
,
strerror
(
errno
));
wError
(
"
vgId:%d, wal:%s, failed to open file, reason:%s"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
}
}
...
...
@@ -334,9 +183,9 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
return
terrno
;
}
int
walGetWalFile
(
void
*
handle
,
char
*
name
,
uint32_t
*
index
)
{
int
32_t
walGetWalFile
(
void
*
handle
,
char
*
name
,
uint32_t
*
index
)
{
SWal
*
pWal
=
handle
;
int
code
=
1
;
int
32_t
code
=
1
;
int32_t
first
=
0
;
name
[
0
]
=
0
;
...
...
@@ -359,22 +208,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
return
code
;
}
static
void
walRelease
(
SWal
*
pWal
)
{
pthread_mutex_destroy
(
&
pWal
->
mutex
);
pWal
->
signature
=
NULL
;
free
(
pWal
);
if
(
atomic_sub_fetch_32
(
&
tsWalNum
,
1
)
==
0
)
{
if
(
walTmrCtrl
)
taosTmrCleanUp
(
walTmrCtrl
);
walTmrCtrl
=
NULL
;
walModuleInit
=
PTHREAD_ONCE_INIT
;
wDebug
(
"WAL module is cleaned up"
);
}
}
static
int
walRestoreWalFile
(
SWal
*
pWal
,
void
*
pVnode
,
FWalWrite
writeFp
)
{
char
*
name
=
pWal
->
name
;
int
size
=
1024
*
1024
;
// default 1M buffer size
static
int32_t
walRestoreWalFile
(
SWal
*
pWal
,
void
*
pVnode
,
FWalWrite
writeFp
)
{
char
*
name
=
pWal
->
name
;
int32_t
size
=
1024
*
1024
;
// default 1M buffer size
terrno
=
0
;
char
*
buffer
=
malloc
(
size
);
...
...
@@ -385,36 +221,36 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
SWalHead
*
pHead
=
(
SWalHead
*
)
buffer
;
int
fd
=
open
(
name
,
O_RDWR
);
int
32_t
fd
=
open
(
name
,
O_RDWR
);
if
(
fd
<
0
)
{
wError
(
"
wal:%s, failed to open for restore(%s)"
,
name
,
strerror
(
errno
));
wError
(
"
vgId:%d, wal:%s, failed to open for restore(%s)"
,
pWal
->
vgId
,
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
free
(
buffer
);
return
terrno
;
}
wDebug
(
"
wal:%s, start to restore"
,
name
);
wDebug
(
"
vgId:%d, wal:%s, start to restore"
,
pWal
->
vgId
,
name
);
size_t
offset
=
0
;
while
(
1
)
{
int
ret
=
taosTRead
(
fd
,
pHead
,
sizeof
(
SWalHead
));
int
32_t
ret
=
taosTRead
(
fd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
==
0
)
break
;
if
(
ret
<
0
)
{
wError
(
"
wal:%s, failed to read wal head part since %s"
,
name
,
strerror
(
errno
));
wError
(
"
vgId:%d, wal:%s, failed to read wal head part since %s"
,
pWal
->
vgId
,
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
if
(
ret
<
sizeof
(
SWalHead
))
{
wError
(
"
wal:%s, failed to read head, ret:%d, skip the rest of file"
,
name
,
ret
);
wError
(
"
vgId:%d, wal:%s, failed to read head, ret:%d, skip the rest of file"
,
pWal
->
vgId
,
name
,
ret
);
taosFtruncate
(
fd
,
offset
);
fsync
(
fd
);
break
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)))
{
wWarn
(
"
wal:%s, cksum is messed up, skip the rest of file"
,
name
);
wWarn
(
"
vgId:%d, wal:%s, cksum is messed up, skip the rest of file"
,
pWal
->
vgId
,
name
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
false
);
break
;
...
...
@@ -433,13 +269,14 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
ret
=
taosTRead
(
fd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
<
0
)
{
wError
(
"
wal:%s failed to read wal body part since %s"
,
name
,
strerror
(
errno
));
wError
(
"
vgId:%d, wal:%s failed to read wal body part since %s"
,
pWal
->
vgId
,
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
if
(
ret
<
pHead
->
len
)
{
wError
(
"wal:%s, failed to read body, len:%d ret:%d, skip the rest of file"
,
name
,
pHead
->
len
,
ret
);
wError
(
"vgId:%d, wal:%s, failed to read body, len:%d ret:%d, skip the rest of file"
,
pWal
->
vgId
,
name
,
pHead
->
len
,
ret
);
taosFtruncate
(
fd
,
offset
);
fsync
(
fd
);
break
;
...
...
@@ -457,50 +294,9 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
return
terrno
;
}
int
walHandleExistingFiles
(
const
char
*
path
)
{
char
oname
[
TSDB_FILENAME_LEN
*
3
];
char
nname
[
TSDB_FILENAME_LEN
*
3
];
char
opath
[
TSDB_FILENAME_LEN
];
snprintf
(
opath
,
sizeof
(
opath
),
"%s/old"
,
path
);
struct
dirent
*
ent
;
DIR
*
dir
=
opendir
(
path
);
int
plen
=
strlen
(
walPrefix
);
terrno
=
0
;
int
midx
=
walGetMaxOldFileId
(
opath
);
int
count
=
0
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
if
(
strncmp
(
ent
->
d_name
,
walPrefix
,
plen
)
==
0
)
{
midx
++
;
snprintf
(
oname
,
sizeof
(
oname
),
"%s/%s"
,
path
,
ent
->
d_name
);
snprintf
(
nname
,
sizeof
(
nname
),
"%s/old/wal%d"
,
path
,
midx
);
if
(
taosMkDir
(
opath
,
0755
)
!=
0
)
{
wError
(
"wal:%s, failed to create directory:%s(%s)"
,
oname
,
opath
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
if
(
rename
(
oname
,
nname
)
<
0
)
{
wError
(
"wal:%s, failed to move to new:%s"
,
oname
,
nname
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
count
++
;
}
wDebug
(
"wal:%s, %d files are moved for restoration"
,
path
,
count
);
}
closedir
(
dir
);
return
terrno
;
}
static
int
walRemoveWalFiles
(
const
char
*
path
)
{
int
plen
=
strlen
(
walPrefix
);
char
name
[
TSDB_FILENAME_LEN
*
3
];
static
int32_t
walRemoveWalFiles
(
const
char
*
path
)
{
int32_t
plen
=
strlen
(
walPrefix
);
char
name
[
TSDB_FILENAME_LEN
*
3
];
terrno
=
0
;
...
...
@@ -527,54 +323,9 @@ static int walRemoveWalFiles(const char *path) {
return
terrno
;
}
static
void
walProcessFsyncTimer
(
void
*
param
,
void
*
tmrId
)
{
SWal
*
pWal
=
param
;
if
(
pWal
->
signature
!=
pWal
)
return
;
if
(
pWal
->
fd
<
0
)
return
;
if
(
fsync
(
pWal
->
fd
)
<
0
)
{
wError
(
"wal:%s, fsync failed(%s)"
,
pWal
->
name
,
strerror
(
errno
));
}
if
(
walNeedFsyncTimer
(
pWal
))
{
pWal
->
timer
=
taosTmrStart
(
walProcessFsyncTimer
,
pWal
->
fsyncPeriod
,
pWal
,
walTmrCtrl
);
}
else
{
wInfo
(
"wal:%s, stop fsync timer for walLevel:%d fsyncPeriod:%d"
,
pWal
->
name
,
pWal
->
level
,
pWal
->
fsyncPeriod
);
taosTmrStop
(
pWal
->
timer
);
pWal
->
timer
=
NULL
;
}
}
int64_t
walGetVersion
(
twalh
param
)
{
SWal
*
pWal
=
param
;
if
(
pWal
==
0
)
return
0
;
return
pWal
->
version
;
}
static
int
walGetMaxOldFileId
(
char
*
odir
)
{
int
midx
=
0
;
DIR
*
dir
=
NULL
;
struct
dirent
*
dp
=
NULL
;
int
plen
=
strlen
(
walPrefix
);
if
(
access
(
odir
,
F_OK
)
!=
0
)
return
midx
;
dir
=
opendir
(
odir
);
if
(
dir
==
NULL
)
{
wError
(
"failed to open directory %s since %s"
,
odir
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
while
((
dp
=
readdir
(
dir
))
!=
NULL
)
{
if
(
strncmp
(
dp
->
d_name
,
walPrefix
,
plen
)
==
0
)
{
int
idx
=
atol
(
dp
->
d_name
+
plen
);
if
(
midx
<
idx
)
midx
=
idx
;
}
}
closedir
(
dir
);
return
midx
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录