From d0e60fdec1ce9de57027b51b3d2d3162b4de6c2e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 29 Oct 2020 12:17:56 +0000 Subject: [PATCH] TD-1856 TD-1847 --- src/inc/twal.h | 19 +- src/{wal/inc/walMgmt.h => util/inc/talloc.h} | 19 +- src/util/src/talloc.c | 66 ++++ src/wal/inc/walInt.h | 20 ++ src/wal/src/walMgmt.c | 221 +++++++++++- src/wal/src/walWrite.c | 351 +++---------------- 6 files changed, 387 insertions(+), 309 deletions(-) rename src/{wal/inc/walMgmt.h => util/inc/talloc.h} (59%) create mode 100644 src/util/src/talloc.c diff --git a/src/inc/twal.h b/src/inc/twal.h index 1ce7b132b0..626c4434e9 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -19,9 +19,11 @@ extern "C" { #endif -#define TAOS_WAL_NOLOG 0 -#define TAOS_WAL_WRITE 1 -#define TAOS_WAL_FSYNC 2 +typedef enum { + TAOS_WAL_NOLOG = 0, + TAOS_WAL_WRITE = 1, + TAOS_WAL_FSYNC = 2 +} EWalType; typedef struct { int8_t msgType; @@ -34,17 +36,22 @@ typedef struct { } SWalHead; typedef struct { - int8_t walLevel; // wal level + int32_t vgId; int32_t fsyncPeriod; // millisecond + int8_t walLevel; // wal level int8_t wals; // number of WAL files; int8_t keep; // keep the wal file when closed + int8_t reserved[5]; } SWalCfg; typedef void* twalh; // WAL HANDLE typedef int (*FWalWrite)(void *ahandle, void *pHead, int type); -twalh walOpen(const char *path, const SWalCfg *pCfg); -int walAlter(twalh pWal, const SWalCfg *pCfg); +int32_t walInit(); +void walCleanUp(); + +twalh walOpen(char *path, SWalCfg *pCfg); +int walAlter(twalh pWal, SWalCfg *pCfg); void walClose(twalh); int walRenew(twalh); int walWrite(twalh, SWalHead *); diff --git a/src/wal/inc/walMgmt.h b/src/util/inc/talloc.h similarity index 59% rename from src/wal/inc/walMgmt.h rename to src/util/inc/talloc.h index a23c7f8ec3..5f04f4b99c 100644 --- a/src/wal/inc/walMgmt.h +++ b/src/util/inc/talloc.h @@ -13,13 +13,28 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_WAL_MGMT_H -#define TDENGINE_WAL_MGMT_H +#ifndef TDENGINE_UTIL_ALLOC_H +#define TDENGINE_UTIL_ALLOC_H #ifdef __cplusplus extern "C" { #endif +#ifdef TSDB_USE_SYS_MEM + #define tmalloc(size) malloc(size) + #define tcalloc(size) calloc(1, size) + #define tmemalign(alignment, size) malloc(size) + #define tfree(p) free(p) + #define tmemzero(p, size) memset(p, 0, size) +#else + void *tmalloc(int32_t size); + void *tcalloc(int32_t size); + void *tmemalign(int32_t alignment, int32_t size); + void tfree(void *p); + void tmemzero(void *p, int32_t size); +#endif + + #ifdef __cplusplus } #endif diff --git a/src/util/src/talloc.c b/src/util/src/talloc.c new file mode 100644 index 0000000000..4f7067a26a --- /dev/null +++ b/src/util/src/talloc.c @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tulog.h" + +#define TSDB_HAVE_MEMALIGN + +void *tmalloc(int32_t size) { + void *p = malloc(size); + if (p == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to malloc memory, size:%d reason:%s", size, strerror(errno)); + } + + return p; +} + +void *tcalloc(int32_t size) { + void *p = calloc(1, size); + if (p == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno)); + } + + return p; +} + +void tfree(void *p) { free(p); } + +void tmemzero(void *p, int32_t size) { memset(p, 0, size); } + +#ifdef TSDB_HAVE_MEMALIGN + +void *tmemalign(int32_t alignment, int32_t size) { + void *p; + + int err = posix_memalign(&p, alignment, size); + if (err) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to memalign memory, alignment:%d size:%d reason:%s", alignment, size, strerror(err)); + p = NULL; + } + + return p; +} + +#else + +void *tmemalign(int32_t alignment, int32_t size) { return tmalloc(size); } + +#endif diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 593611589d..fc6955ab97 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -32,8 +32,28 @@ extern int32_t wDebugFlag; #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define walPrefix "wal" +#define walRefreshIntervalMs 1000 #define walSignature (uint32_t)(0xFAFBFDFE) +typedef struct { + uint64_t version; + int32_t vgId; + int32_t fd; + int32_t keep; + int32_t level; + int32_t fsyncPeriod; + int32_t fsyncSeq; + int32_t fileIndex; + void* timer; + void* signature; + int max; // maximum number of wal files + uint32_t id; // increase continuously + int num; // number of wal files + char path[TSDB_FILENAME_LEN]; + char name[TSDB_FILENAME_LEN + 16]; + pthread_mutex_t mutex; +} SWal; + #ifdef __cplusplus } #endif diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 2dd094d860..ebd1328c1b 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -16,6 +16,225 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "talloc.h" +#include "tref.h" +#include "tutil.h" #include "twal.h" #include "walInt.h" -#include "walMgmt.h" \ No newline at end of file + +typedef struct { + int32_t refId; + int32_t num; + int32_t seq; + int8_t stop; + int8_t reserved[3]; + pthread_t thread; + pthread_mutex_t mutex; +} SWalMgmt; + +static SWalMgmt tsWal; +static int32_t walCreateThread(); +static void walStopThread(); +static int32_t walInitObj(SWal *pWal); +static void walFreeObj(void *pWal); + +int32_t walInit() { + tmemzero(&tsWal, sizeof(SWalMgmt)); + tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); + + int32_t code = walCreateThread(); + if (code != TSDB_CODE_SUCCESS) { + wError("failed to init wal module, reason:%s", tstrerror(code)); + return code; + } + + wInfo("wal module is initialized"); + return code; +} + +void walCleanUp() { + walStopThread(); + taosCloseRef(tsWal.refId); + wInfo("wal module is cleaned up"); +} + +void *walOpen(char *path, SWalCfg *pCfg) { + SWal *pWal = tcalloc(sizeof(SWal)); + if (pWal == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + pWal->fd = -1; + pWal->max = pCfg->wals; + pWal->id = 0; + pWal->num = 0; + pWal->level = pCfg->walLevel; + pWal->keep = pCfg->keep; + pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->signature = pWal; + tstrncpy(pWal->path, path, sizeof(path)); + pthread_mutex_init(&pWal->mutex, NULL); + + pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; + if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; + + if (walInitObj(pWal) != TSDB_CODE_SUCCESS) { + walFreeObj(pWal); + return NULL; + } + + if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) { + walFreeObj(pWal); + return NULL; + } + + atomic_add_fetch_32(&tsWal.num, 1); + wDebug("vgId:%d, wal is opened, level:%d period:%d path:%s", pWal->vgId, pWal->level, pWal->fsyncPeriod, pWal->path); + + return pWal; +} + +int32_t walAlter(void *handle, SWalCfg *pCfg) { + SWal *pWal = handle; + + if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { + wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, + pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + return TSDB_CODE_SUCCESS; + } + + wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level, + pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + + pWal->level = pCfg->walLevel; + pWal->fsyncPeriod = pCfg->fsyncPeriod; + + return TSDB_CODE_SUCCESS; +} + +void walClose(void *handle) { + SWal *pWal = handle; + taosClose(pWal->fd); + + if (pWal->keep == 0) { + // remove all files in the directory + for (int32_t i = 0; i < pWal->num; ++i) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i); + if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%s, failed to remove", pWal->vgId, pWal->name); + } else { + wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); + } + } + } else { + wDebug("vgId:%d, wal:%s, it is closed and kept", pWal->vgId, pWal->name); + } + + taosRemoveRef(tsWal.refId, pWal); +} + +static int32_t walInitObj(SWal *pWal) { + if (taosMkDir(pWal->path, 0755) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to create directory, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + return terrno; + } + + if (pWal->keep == 1) { + return TSDB_CODE_SUCCESS; + } + + if (pWal && pWal->fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + return terrno; + } + + wDebug("vgId:%d, wal:%s, is initialized", pWal->vgId, pWal->name); + return TSDB_CODE_SUCCESS; +} + +static void walFreeObj(void *wal) { + SWal *pWal = pWal; + wDebug("vgId:%d, wal is freed", pWal->vgId); + + taosClose(pWal->fd); + pthread_mutex_destroy(&pWal->mutex); + tfree(pWal); +} + +// static bool walNeedFsync(SWal *pWal) { +// if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { +// return false; +// } + +// if (tsWal.seq % pWal->fsyncSeq == 0) { +// return true; +// } + +// return false; +// } + +static void walUpdateSeq() { + taosMsleep(walRefreshIntervalMs); + if (++tsWal.seq <= 0) { + tsWal.seq = 1; + } +} + +static void walFsyncAll() { + // int32_t code; + // void * pIter = taosRefCreateIter(tsWal.refId); + + // while (taosRefIterNext(pIter)) { + // SWal *pWal = taosRefIterGet(pIter); + // if (pWal == NULL) break; + + // if (!walNeedFsync(pWal)) { + // wTrace("wal:%s, do fsync, level:%d seq:%d rseq:%d", pWal->name, pWal->level, pWal->fsyncSeq, tsWal.refreshSeq); + // code = walFsync(pWal); + // if (code != TSDB_CODE_SUCCESS) { + // wError("wal:%s, fsync failed(%s)", pWal->name, strerror(code)); + // } + // } + + // taosReleaseRef(pWal); + // } + + // taosRefDestroyIter(pIter); +} + +static void *walThreadFunc(void *param) { + while (1) { + walUpdateSeq(); + walFsyncAll(); + if (tsWal.stop) break; + } + + return NULL; +} + +static int32_t walCreateThread() { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { + wError("failed to create wal thread, reason:%s", strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } + + pthread_attr_destroy(&thAttr); + wDebug("wal thread is launched"); + + return TSDB_CODE_SUCCESS; +} + +static void walStopThread() { + if (tsWal.thread) { + pthread_join(tsWal.thread, NULL); + } + + wDebug("wal thread is stopped"); +} diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 95587caa14..2eea80e20a 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -15,171 +15,18 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "twal.h" -#include "walInt.h" -#include "walMgmt.h" +#include "talloc.h" +#include "taoserror.h" #include "tchecksum.h" #include "tutil.h" -#include "ttimer.h" -#include "taoserror.h" -#include "twal.h" #include "tqueue.h" +#include "twal.h" +#include "walInt.h" +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); +static int32_t walRemoveWalFiles(const char *path); -typedef struct { - uint64_t version; - int fd; - int keep; - int level; - int32_t fsyncPeriod; - void *timer; - void *signature; - int max; // maximum number of wal files - uint32_t id; // increase continuously - int num; // number of wal files - char path[TSDB_FILENAME_LEN]; - char name[TSDB_FILENAME_LEN+16]; - pthread_mutex_t mutex; -} SWal; - -static void *walTmrCtrl = NULL; -static int tsWalNum = 0; -static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT; -static int walHandleExistingFiles(const char *path); -static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); -static int walRemoveWalFiles(const char *path); -static void walProcessFsyncTimer(void *param, void *tmrId); -static void walRelease(SWal *pWal); -static int walGetMaxOldFileId(char *odir); - -static void walModuleInitFunc() { - walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL"); - if (walTmrCtrl == NULL) - walModuleInit = PTHREAD_ONCE_INIT; - else - wDebug("WAL module is initialized"); -} - -static inline bool walNeedFsyncTimer(SWal *pWal) { - if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) { - return true; - } - return false; -} - -void *walOpen(const char *path, const SWalCfg *pCfg) { - SWal *pWal = calloc(sizeof(SWal), 1); - if (pWal == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - pthread_once(&walModuleInit, walModuleInitFunc); - if (walTmrCtrl == NULL) { - free(pWal); - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - atomic_add_fetch_32(&tsWalNum, 1); - pWal->fd = -1; - pWal->max = pCfg->wals; - pWal->id = 0; - pWal->num = 0; - pWal->level = pCfg->walLevel; - pWal->keep = pCfg->keep; - pWal->fsyncPeriod = pCfg->fsyncPeriod; - pWal->signature = pWal; - tstrncpy(pWal->path, path, sizeof(pWal->path)); - pthread_mutex_init(&pWal->mutex, NULL); - - if (walNeedFsyncTimer(pWal)) { - pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl); - if (pWal->timer == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - walRelease(pWal); - return NULL; - } - } - - if (taosMkDir(path, 0755) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("wal:%s, failed to create directory(%s)", path, strerror(errno)); - walRelease(pWal); - pWal = NULL; - } - - if (pCfg->keep == 1) return pWal; - - if (walHandleExistingFiles(path) == 0) walRenew(pWal); - - if (pWal && pWal->fd < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("wal:%s, failed to open(%s)", path, strerror(errno)); - walRelease(pWal); - pWal = NULL; - } - - if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod); - return pWal; -} - -int walAlter(twalh wal, const SWalCfg *pCfg) { - SWal *pWal = wal; - if (pWal == NULL) { - return TSDB_CODE_WAL_APP_ERROR; - } - - if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { - wDebug("wal:%s, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->name, pWal->level, - pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); - return TSDB_CODE_SUCCESS; - } - - wInfo("wal:%s, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->name, pWal->level, pWal->fsyncPeriod, - pCfg->walLevel, pCfg->fsyncPeriod); - - pthread_mutex_lock(&pWal->mutex); - pWal->level = pCfg->walLevel; - pWal->fsyncPeriod = pCfg->fsyncPeriod; - if (walNeedFsyncTimer(pWal)) { - wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); - taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer, walTmrCtrl); - } else { - wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); - taosTmrStop(pWal->timer); - pWal->timer = NULL; - } - pthread_mutex_unlock(&pWal->mutex); - - return TSDB_CODE_SUCCESS; -} - -void walClose(void *handle) { - if (handle == NULL) return; - - SWal *pWal = handle; - taosClose(pWal->fd); - if (pWal->timer) taosTmrStopA(&pWal->timer); - - if (pWal->keep == 0) { - // remove all files in the directory - for (int i = 0; i < pWal->num; ++i) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i); - if (remove(pWal->name) < 0) { - wError("wal:%s, failed to remove", pWal->name); - } else { - wDebug("wal:%s, it is removed", pWal->name); - } - } - } else { - wDebug("wal:%s, it is closed and kept", pWal->name); - } - - walRelease(pWal); -} - -int walRenew(void *handle) { +int32_t walRenew(void *handle) { if (handle == NULL) return 0; SWal *pWal = handle; @@ -190,7 +37,7 @@ int walRenew(void *handle) { if (pWal->fd >= 0) { close(pWal->fd); pWal->id++; - wDebug("wal:%s, it is closed", pWal->name); + wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); } pWal->num++; @@ -199,19 +46,19 @@ int walRenew(void *handle) { pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - wError("wal:%s, failed to open(%s)", pWal->name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open, reason:%s", pWal->vgId, pWal->name, strerror(errno)); } else { - wDebug("wal:%s, it is created", pWal->name); + wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); if (pWal->num > pWal->max) { // remove the oldest wal file char name[TSDB_FILENAME_LEN * 3]; snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); if (remove(name) < 0) { - wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to remove(%s)", pWal->vgId, name, strerror(errno)); } else { - wDebug("wal:%s, it is removed", name); + wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, name); } pWal->num--; @@ -223,7 +70,7 @@ int walRenew(void *handle) { return terrno; } -int walWrite(void *handle, SWalHead *pHead) { +int32_t walWrite(void *handle, SWalHead *pHead) { SWal *pWal = handle; if (pWal == NULL) return -1; @@ -235,11 +82,11 @@ int walWrite(void *handle, SWalHead *pHead) { pHead->signature = walSignature; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); - int contLen = pHead->len + sizeof(SWalHead); + int32_t contLen = pHead->len + sizeof(SWalHead); if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { - wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to write(%s)", pWal->vgId, pWal->name, strerror(errno)); return terrno; } else { pWal->version = pHead->version; @@ -255,29 +102,30 @@ void walFsync(void *handle) { if (pWal->fsyncPeriod == 0) { if (fsync(pWal->fd) < 0) { - wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); + wError("vgId:%d, wal:%s, fsync failed(%s)", pWal->vgId, pWal->name, strerror(errno)); } } } -int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) { - SWal *pWal = handle; - struct dirent *ent; - int count = 0; - uint32_t maxId = 0, minId = -1, index =0; +int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { + SWal * pWal = handle; + int32_t count = 0; + uint32_t maxId = 0; + uint32_t minId = -1; + uint32_t index = 0; + int32_t code = 0; + struct dirent *ent; terrno = 0; - int plen = strlen(walPrefix); + int32_t plen = strlen(walPrefix); char opath[TSDB_FILENAME_LEN + 5]; - - int slen = snprintf(opath, sizeof(opath), "%s", pWal->path); - if (pWal->keep == 0) strcpy(opath + slen, "/old"); + snprintf(opath, sizeof(opath), "%s", pWal->path); DIR *dir = opendir(opath); if (dir == NULL && errno == ENOENT) return 0; if (dir == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + code = TAOS_SYSTEM_ERROR(errno); + return code; } while ((ent = readdir(dir)) != NULL) { @@ -290,6 +138,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } closedir(dir); + pWal->fileIndex = maxId; if (count == 0) { if (pWal->keep) terrno = walRenew(pWal); @@ -297,10 +146,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } if (count != (maxId - minId + 1)) { - wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); + wError("vgId:%d, wal:%s, messed up, count:%d max:%d min:%d", pWal->vgId, opath, count, maxId, minId); terrno = TSDB_CODE_WAL_APP_ERROR; } else { - wDebug("wal:%s, %d files will be restored", opath, count); + wDebug("vgId:%d, wal:%s, %d files will be restored", pWal->vgId, opath, count); for (index = minId; index <= maxId; ++index) { snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index); @@ -314,7 +163,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) terrno = walRemoveWalFiles(opath); if (terrno == 0) { if (remove(opath) < 0) { - wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + wError("vgId:%d, wal:%s, failed to remove directory, reason:%s", pWal->vgId, opath, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); } } @@ -325,7 +174,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); } } @@ -334,9 +183,9 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) return terrno; } -int walGetWalFile(void *handle, char *name, uint32_t *index) { +int32_t walGetWalFile(void *handle, char *name, uint32_t *index) { SWal * pWal = handle; - int code = 1; + int32_t code = 1; int32_t first = 0; name[0] = 0; @@ -359,22 +208,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static void walRelease(SWal *pWal) { - pthread_mutex_destroy(&pWal->mutex); - pWal->signature = NULL; - free(pWal); - - if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) { - if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl); - walTmrCtrl = NULL; - walModuleInit = PTHREAD_ONCE_INIT; - wDebug("WAL module is cleaned up"); - } -} - -static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { - char *name = pWal->name; - int size = 1024 * 1024; // default 1M buffer size +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { + char * name = pWal->name; + int32_t size = 1024 * 1024; // default 1M buffer size terrno = 0; char *buffer = malloc(size); @@ -385,36 +221,36 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { SWalHead *pHead = (SWalHead *)buffer; - int fd = open(name, O_RDWR); + int32_t fd = open(name, O_RDWR); if (fd < 0) { - wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to open for restore(%s)", pWal->vgId, name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); free(buffer); return terrno; } - wDebug("wal:%s, start to restore", name); + wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); size_t offset = 0; while (1) { - int ret = taosTRead(fd, pHead, sizeof(SWalHead)); + int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead)); if (ret == 0) break; if (ret < 0) { - wError("wal:%s, failed to read wal head part since %s", name, strerror(errno)); + wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); break; } if (ret < sizeof(SWalHead)) { - wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret); + wError("vgId:%d, wal:%s, failed to read head, ret:%d, skip the rest of file", pWal->vgId, name, ret); taosFtruncate(fd, offset); fsync(fd); break; } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wWarn("wal:%s, cksum is messed up, skip the rest of file", name); + wWarn("vgId:%d, wal:%s, cksum is messed up, skip the rest of file", pWal->vgId, name); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(false); break; @@ -433,13 +269,14 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { ret = taosTRead(fd, pHead->cont, pHead->len); if (ret < 0) { - wError("wal:%s failed to read wal body part since %s", name, strerror(errno)); + wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); break; } if (ret < pHead->len) { - wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret); + wError("vgId:%d, wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", pWal->vgId, name, pHead->len, + ret); taosFtruncate(fd, offset); fsync(fd); break; @@ -457,50 +294,9 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { return terrno; } -int walHandleExistingFiles(const char *path) { - char oname[TSDB_FILENAME_LEN * 3]; - char nname[TSDB_FILENAME_LEN * 3]; - char opath[TSDB_FILENAME_LEN]; - - snprintf(opath, sizeof(opath), "%s/old", path); - - struct dirent *ent; - DIR *dir = opendir(path); - int plen = strlen(walPrefix); - terrno = 0; - - int midx = walGetMaxOldFileId(opath); - int count = 0; - while ((ent = readdir(dir)) != NULL) { - if (strncmp(ent->d_name, walPrefix, plen) == 0) { - midx++; - snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name); - snprintf(nname, sizeof(nname), "%s/old/wal%d", path, midx); - if (taosMkDir(opath, 0755) != 0) { - wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (rename(oname, nname) < 0) { - wError("wal:%s, failed to move to new:%s", oname, nname); - terrno = TAOS_SYSTEM_ERROR(errno); - break; - } - - count++; - } - - wDebug("wal:%s, %d files are moved for restoration", path, count); - } - - closedir(dir); - return terrno; -} - -static int walRemoveWalFiles(const char *path) { - int plen = strlen(walPrefix); - char name[TSDB_FILENAME_LEN * 3]; +static int32_t walRemoveWalFiles(const char *path) { + int32_t plen = strlen(walPrefix); + char name[TSDB_FILENAME_LEN * 3]; terrno = 0; @@ -527,54 +323,9 @@ static int walRemoveWalFiles(const char *path) { return terrno; } -static void walProcessFsyncTimer(void *param, void *tmrId) { - SWal *pWal = param; - - if (pWal->signature != pWal) return; - if (pWal->fd < 0) return; - - if (fsync(pWal->fd) < 0) { - wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno)); - } - - if (walNeedFsyncTimer(pWal)) { - pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl); - } else { - wInfo("wal:%s, stop fsync timer for walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); - taosTmrStop(pWal->timer); - pWal->timer = NULL; - } -} - int64_t walGetVersion(twalh param) { SWal *pWal = param; if (pWal == 0) return 0; return pWal->version; } - -static int walGetMaxOldFileId(char *odir) { - int midx = 0; - DIR * dir = NULL; - struct dirent *dp = NULL; - int plen = strlen(walPrefix); - - if (access(odir, F_OK) != 0) return midx; - - dir = opendir(odir); - if (dir == NULL) { - wError("failed to open directory %s since %s", odir, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - while ((dp = readdir(dir)) != NULL) { - if (strncmp(dp->d_name, walPrefix, plen) == 0) { - int idx = atol(dp->d_name + plen); - if (midx < idx) midx = idx; - } - } - - closedir(dir); - return midx; -} \ No newline at end of file -- GitLab