From 87bf983d488ad915853d2e79e4e58ca248ef6c59 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 29 Mar 2020 23:28:06 +0800 Subject: [PATCH] first draft for WAL --- src/vnode/wal/CMakeLists.txt | 15 +- src/vnode/wal/inc/{vnodeWal.h => twal.h} | 31 ++- src/vnode/wal/src/vnodeWal.c | 27 -- src/vnode/wal/src/walMain.c | 316 +++++++++++++++++++++++ src/vnode/wal/test/CMakeLists.txt | 16 ++ src/vnode/wal/test/waltest.c | 112 ++++++++ 6 files changed, 481 insertions(+), 36 deletions(-) rename src/vnode/wal/inc/{vnodeWal.h => twal.h} (56%) delete mode 100644 src/vnode/wal/src/vnodeWal.c create mode 100644 src/vnode/wal/src/walMain.c create mode 100644 src/vnode/wal/test/CMakeLists.txt create mode 100644 src/vnode/wal/test/waltest.c diff --git a/src/vnode/wal/CMakeLists.txt b/src/vnode/wal/CMakeLists.txt index 1de958f84e..d77a235bb9 100644 --- a/src/vnode/wal/CMakeLists.txt +++ b/src/vnode/wal/CMakeLists.txt @@ -1,4 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) -ADD_LIBRARY(wal ${SRC}) -TARGET_INCLUDE_DIRECTORIES(wal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc) \ No newline at end of file +ADD_LIBRARY(twal ${SRC}) +TARGET_LINK_LIBRARIES(twal tutil) + +ADD_SUBDIRECTORY(test) + diff --git a/src/vnode/wal/inc/vnodeWal.h b/src/vnode/wal/inc/twal.h similarity index 56% rename from src/vnode/wal/inc/vnodeWal.h rename to src/vnode/wal/inc/twal.h index 7753e4ecca..34491c993d 100644 --- a/src/vnode/wal/inc/vnodeWal.h +++ b/src/vnode/wal/inc/twal.h @@ -14,19 +14,36 @@ */ #ifndef _TD_WAL_H_ #define _TD_WAL_H_ -#include #ifdef __cplusplus extern "C" { #endif -typedef void walh; // WAL HANDLE +#define TAOS_WAL_NOLOG 0 +#define TAOS_WAL_WRITE 1 +#define TAOS_WAL_FSYNC 2 + +typedef struct { + uint32_t signature; + uint32_t cksum; + int8_t msgType; + int8_t reserved[3]; + int32_t len; + uint64_t version; + char cont[]; +} SWalHead; + +typedef void* twal_h; // WAL HANDLE + +twal_h walOpen(char *path, int max, int level); +void walClose(twal_h); +int walRenew(twal_h); +int walWrite(twal_h, SWalHead *); +void walFsync(twal_h); +int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); + +extern int wDebugFlag; -walh *vnodeOpenWal(int vnode, uint8_t op); -int vnodeCloseWal(walh *pWal); -int vnodeRenewWal(walh *pWal); -int vnodeWriteWal(walh *pWal, void *cont, int contLen); -int vnodeSyncWal(walh *pWal); #ifdef __cplusplus } diff --git a/src/vnode/wal/src/vnodeWal.c b/src/vnode/wal/src/vnodeWal.c deleted file mode 100644 index 528cc97ed6..0000000000 --- a/src/vnode/wal/src/vnodeWal.c +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include - -#include "vnodeWal.h" - -typedef struct { - /* TODO */ -} SWal; - -walh *vnodeOpenWal(int vnode, uint8_t op) { return NULL; } -int vnodeCloseWal(walh *pWal) { return 0; } -int vnodeRenewWal(walh *pWal) { return 0; } -int vnodeWriteWal(walh *pWal, void *cont, int contLen) { return 0; } -int vnodeSyncWal(walh *pWal) { return 0; } \ No newline at end of file diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c new file mode 100644 index 0000000000..b3d14eb179 --- /dev/null +++ b/src/vnode/wal/src/walMain.c @@ -0,0 +1,316 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "os.h" +#include "tlog.h" +#include "tutil.h" +#include "twal.h" + +#define walPrefix "wal" +#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);} +#define wWarn(...) if (wDebugFlag & DEBUG_WARN) {tprintf("WARN WAL ", wDebugFlag, __VA_ARGS__);} +#define wTrace(...) if (wDebugFlag & DEBUG_TRACE) {tprintf("WAL ", wDebugFlag, __VA_ARGS__);} +#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);} + +typedef struct { + int fd; + int level; + int max; // maximum number of wal files + uint32_t id; // increase continuously + int num; // number of wal files + char path[TSDB_FILENAME_LEN]; + char name[TSDB_FILENAME_LEN]; +} SWal; + +int wDebugFlag = 135; + +static uint32_t walSignature = 0xFAFBFDFE; +static int walHandleExistingFiles(char *path); +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)); +static int walRemoveWalFiles(char *path); + +void *walOpen(char *path, int max, int level) { + SWal *pWal = calloc(sizeof(SWal), 1); + if (pWal == NULL) return NULL; + + pWal->fd = -1; + pWal->max = max; + pWal->id = 0; + pWal->num = 0; + pWal->level = level; + strcpy(pWal->path, path); + + if (access(path, F_OK) != 0) mkdir(path, 0755); + + if (walHandleExistingFiles(path) == 0) + walRenew(pWal); + + if (pWal->fd <0) { + wError("wal:%s, failed to open", path); + free(pWal); + pWal = NULL; + } + + return pWal; +} + +void walClose(void *handle) { + + SWal *pWal = (SWal *)handle; + + close(pWal->fd); + + // remove all files in the directory + for (int i=0; inum; ++i) { + sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id-i); + if (remove(pWal->name) <0) { + wError("wal:%s, failed to remove", pWal->name); + } else { + wTrace("wal:%s, it is removed", pWal->name); + } + } +} + +int walRenew(twal_h handle) { + SWal *pWal = (SWal *)handle; + + if (pWal->fd >=0) { + close(pWal->fd); + pWal->id++; + wTrace("wal:%s, it is closed", pWal->name); + } + + sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id); + pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { + wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno)); + return -1; + } + + wTrace("wal:%s, it is open", pWal->name); + + pWal->num++; + if (pWal->num > pWal->max) { + // remove the oldest wal file + char name[TSDB_FILENAME_LEN]; + sprintf(name, "%s/%s%010d", pWal->path, walPrefix, pWal->id - pWal->max); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + } else { + wTrace("wal:%s, it is removed", name); + } + + pWal->num--; + } + + return 0; +} + +int walWrite(void *handle, SWalHead *pHead) { + SWal *pWal = (SWal *)handle; + int code = 0; + + // no wal + if (pWal->level == TAOS_WAL_NOLOG) return 0; + + pHead->signature = walSignature; + int contLen = pHead->len + sizeof(SWalHead); + + if(write(pWal->fd, pHead, contLen) != contLen) { + wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); + code = -1; + } + + return code; +} + +void walFsync(void *handle) { + + SWal *pWal = (SWal *)handle; + + if (pWal->level == TAOS_WAL_FSYNC) + fsync(pWal->fd); +} + +int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { + SWal *pWal = (SWal *)handle; + int code = 0; + struct dirent *ent; + int count = 0; + uint32_t maxId = 0, minId = -1, index =0; + + int plen = strlen(walPrefix); + char opath[TSDB_FILENAME_LEN]; + sprintf(opath, "%s/old", pWal->path); + + // is there old directory? + if (access(opath, F_OK)) return 0; + + DIR *dir = opendir(opath); + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + index = atol(ent->d_name + plen); + if (index > maxId) maxId = index; + if (index < minId) minId = index; + count++; + } + } + + if ( count != (maxId-minId+1) ) { + wError("wal:%s, messed up, count:%d max:%ld min:%ld", opath, count, maxId, minId); + code = -1; + } else { + wTrace("wal:%s, %d files will be restored", opath, count); + + for (index = minId; index<=maxId; ++index) { + sprintf(pWal->name, "%s/old/%s%010d", pWal->path, walPrefix, index); + code = walRestoreWalFile(pWal->name, pVnode, writeFp); + if (code < 0) break; + } + } + + if (code == 0) { + code = walRemoveWalFiles(opath); + if (code == 0) { + if (remove(opath) < 0) { + wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + code = -1; + } + } + } + + closedir(dir); + + return code; +} + +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { + SWalHead walHead; + int code = -1; + + int fd = open(name, O_RDONLY); + if (fd < 0) { + wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); + return -1; + } + + wTrace("wal:%s, start to restore", name); + + while (1) { + int ret = read(fd, &walHead, sizeof(walHead)); + if ( ret == 0) { code = 0; break;} + + if (ret != sizeof(walHead)) { + wError("wal:%s, failed to read(%s)", name, strerror(errno)); + break; + } + + if (walHead.signature != walSignature) { + wError("wal:%s, file is messed up, signature:", name, walHead.signature); + break; + } + + char *buffer = malloc(sizeof(SWalHead) + walHead.len); + memcpy(buffer, &walHead, sizeof(walHead)); + + ret = read(fd, buffer+sizeof(walHead), walHead.len); + if ( ret != walHead.len) { + wError("wal:%s, failed to read(%s)", name, strerror(errno)); + break; + } + + // write into queue + (*writeFp)(pVnode, buffer); + } + + return code; +} + +int walHandleExistingFiles(char *path) { + int code = 0; + char oname[TSDB_FILENAME_LEN]; + char nname[TSDB_FILENAME_LEN]; + char opath[TSDB_FILENAME_LEN]; + + sprintf(opath, "%s/old", path); + + struct dirent *ent; + DIR *dir = opendir(path); + int plen = strlen(walPrefix); + + if (access(opath, F_OK) == 0) { + // old directory is there, it means restore process is not finished + walRemoveWalFiles(path); + + } else { + // move all files to old directory + int count = 0; + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + if (access(opath, F_OK) != 0) mkdir(opath, 0755); + + sprintf(oname, "%s/%s", path, ent->d_name); + sprintf(nname, "%s/old/%s", path, ent->d_name); + if (rename(oname, nname) < 0) { + wError("wal:%s, failed to move to new:%s", oname, nname); + code = -1; + break; + } + + count++; + } + } + + wTrace("wal:%s, %d files are moved for restoration", path, count); + } + + closedir(dir); + return code; +} + +static int walRemoveWalFiles(char *path) { + int plen = strlen(walPrefix); + char name[TSDB_FILENAME_LEN]; + int code = 0; + + if (access(path, F_OK) != 0) return 0; + + struct dirent *ent; + DIR *dir = opendir(path); + + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + sprintf(name, "%s/%s", path, ent->d_name); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + code = -1; break; + } + } + } + + closedir(dir); + + return code; +} + + diff --git a/src/vnode/wal/test/CMakeLists.txt b/src/vnode/wal/test/CMakeLists.txt new file mode 100644 index 0000000000..06591def40 --- /dev/null +++ b/src/vnode/wal/test/CMakeLists.txt @@ -0,0 +1,16 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND WALTEST_SRC ./waltest.c) + ADD_EXECUTABLE(waltest ${WALTEST_SRC}) + TARGET_LINK_LIBRARIES(waltest twal) + +ENDIF () + + diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c new file mode 100644 index 0000000000..cdeed6b3d4 --- /dev/null +++ b/src/vnode/wal/test/waltest.c @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include +#include "tlog.h" +#include "twal.h" + +int64_t ver = 0; +void *pWal = NULL; + +int writeToQueue(void *pVnode, void *data) { + SWalHead *pHead = (SWalHead *)data; + + // do nothing + if (pHead->version > ver) + ver = pHead->version; + + walWrite(pWal, pHead); + + free(data); + + return 0; +} + +int main(int argc, char *argv[]) { + char path[128] = "/home/jhtao/test/wal"; + int max = 3; + int level = 2; + int total = 5; + int rows = 10000; + int size = 128; + + for (int i=1; iversion = ++ver; + walWrite(pWal, pHead); + } + + printf("renew a wal, i:%d\n", i); + walRenew(pWal); + } + + printf("%d wal files are written\n", total); + getchar(); + + walClose(pWal); + + return 0; +} -- GitLab