From 08c82ac39697844ac278279c04ba354f37da43b0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 30 Oct 2020 22:56:24 +0800 Subject: [PATCH] TD-1846 --- src/wal/inc/walInt.h | 4 ++ src/wal/src/walUtil.c | 85 ++++++++++++++++++++++++ src/wal/src/walWrite.c | 143 ++++++++++++++--------------------------- 3 files changed, 138 insertions(+), 94 deletions(-) create mode 100644 src/wal/src/walUtil.c diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 8b5784d1ce..74a0184c28 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -53,6 +53,10 @@ typedef struct { pthread_mutex_t mutex; } SWal; +// util +int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); +int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); + #ifdef __cplusplus } #endif diff --git a/src/wal/src/walUtil.c b/src/wal/src/walUtil.c new file mode 100644 index 0000000000..b190c703bd --- /dev/null +++ b/src/wal/src/walUtil.c @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "walInt.h" + +int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { + int64_t curFileId = *nextFileId; + int64_t nearFileId = INT64_MAX; + + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + + if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + int64_t id = atoll(name + WAL_PREFIX_LEN); + if (id <= curFileId) continue; + + if (id < nearFileId) { + nearFileId = id; + } + } + } + closedir(dir); + + if (nearFileId == INT64_MAX) return -1; + + *nextFileId = nearFileId; + wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId); + + return 0; +} + +int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) { + int64_t nearFileId = INT64_MAX; + + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + + if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + int64_t id = atoll(name + WAL_PREFIX_LEN); + if (id >= curFileId) continue; + + minDiff--; + if (id < nearFileId) { + nearFileId = id; + } + } + } + closedir(dir); + + if (nearFileId == INT64_MAX) return -1; + if (minDiff > 0) return -1; + + *oldFileId = nearFileId; + wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId); + + return 0; +} \ No newline at end of file diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 70bcc49593..54efdc1ce7 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -18,13 +18,10 @@ #include "talloc.h" #include "taoserror.h" #include "tchecksum.h" -#include "tutil.h" -#include "tqueue.h" #include "twal.h" #include "walInt.h" static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name); -static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName); int32_t walRenew(void *handle) { if (handle == NULL) return 0; @@ -39,12 +36,7 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } - int64_t lastId = pWal->fileId; - if (pWal->keep) { - pWal->fileId = 0; - } else { - pWal->fileId = taosGetTimestampUs(); - } + pWal->fileId = (pWal->keep ? 0 : taosGetTimestampUs()); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); @@ -56,14 +48,18 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name); } - if (!pWal->keep && lastId != -1) { - // remove last wal file - char name[WAL_FILE_LEN]; - snprintf(name, sizeof(name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, lastId); - if (remove(name) < 0) { - wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, name, strerror(errno)); - } else { - wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, name); + if (!pWal->keep) { + // remove the oldest wal file + int64_t oldFileId = -1; + if (walGetOldFile(pWal, pWal->fileId, 2, &oldFileId) == 0) { + char walName[WAL_FILE_LEN] = {0}; + snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); + + if (remove(walName) < 0) { + wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno)); + } else { + wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName); + } } } @@ -115,54 +111,47 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * SWal * pWal = handle; int32_t count = 0; + int32_t code = 0; + int64_t fileId = -1; - DIR *dir = opendir(pWal->path); - if (dir == NULL && errno == ENOENT) return 0; - if (dir == NULL) return TAOS_SYSTEM_ERROR(errno); - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *fileName = ent->d_name; - - if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); - if (fileId == pWal->fileId) continue; - - char walName[WAL_FILE_LEN]; - snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); + while ((code = walGetNextFile(pWal, &fileId)) >= 0) { + if (fileId == pWal->fileId) continue; - wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName); - int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName); - if (code != TSDB_CODE_SUCCESS) continue; + char walName[WAL_FILE_LEN]; + snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - if (!pWal->keep) { - wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName); - remove(walName); - } else { - wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); - } + wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName); + int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName); + if (code != TSDB_CODE_SUCCESS) { + wDebug("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code)); + continue; + } - count++; + if (!pWal->keep) { + wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName); + remove(walName); + } else { + wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); } + + count++; } - closedir(dir); - if (pWal->keep) { - if (count == 0) { - wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name); - return walRenew(pWal); - } else { - // open the existing WAL file in append mode - pWal->fileId = 0; - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); - if (pWal->fd < 0) { - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name); + if (!pWal->keep) return TSDB_CODE_SUCCESS; + + if (count == 0) { + wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name); + return walRenew(pWal); + } else { + // open the existing WAL file in append mode + pWal->fileId = 0; + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); + pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); } + wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name); } return TSDB_CODE_SUCCESS; @@ -173,8 +162,9 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { SWal *pWal = handle; pthread_mutex_lock(&(pWal->mutex)); - int32_t code = walGetNextFile(pWal, *fileId, fileId, fileName); - if (code == 0) { + int32_t code = walGetNextFile(pWal, fileId); + if (code >= 0) { + sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId); code = (*fileId == pWal->fileId) ? 0 : 1; } pthread_mutex_unlock(&(pWal->mutex)); @@ -276,38 +266,3 @@ int64_t walGetVersion(twalh param) { return pWal->version; } - -static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName) { - int64_t nearFileId = INT64_MAX; - char nearFileName[WAL_FILE_LEN] = {0}; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *fileName = ent->d_name; - - if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); - if (fileId <= lastFileId) continue; - - if (fileId < nearFileId) { - nearFileId = fileId; - tstrncpy(nearFileName, fileName, WAL_FILE_LEN); - } - } - } - closedir(dir); - - if (nearFileId == INT64_MAX) return -1; - - *nexFileId = nearFileId; - tstrncpy(nextFileName, nearFileName, WAL_FILE_LEN); - wTrace("vgId:%d, path:%s, lastfile %" PRId64 ", nextfile is %s", pWal->vgId, pWal->path, lastFileId, nextFileName); - - return 0; -} -- GitLab