From d103af8ff825fcb001a35922bf42c7a3c27ad457 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 31 Oct 2020 16:14:27 +0800 Subject: [PATCH] TD-1872 according to the suggestion of review --- src/wal/inc/walInt.h | 3 ++- src/wal/src/walMgmt.c | 21 +++++++++++------ src/wal/src/walUtil.c | 53 ++++++++++++++++++++++++++++++++++-------- src/wal/src/walWrite.c | 11 ++++++++- 4 files changed, 69 insertions(+), 19 deletions(-) diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 319b652dc7..511f8320f0 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -32,7 +32,7 @@ extern int32_t wDebugFlag; #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 3 +#define WAL_PREFIX_LEN 2 #define WAL_REFRESH_MS 1000 #define WAL_MAX_SIZE (1024 * 1024) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) @@ -56,6 +56,7 @@ typedef struct { int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); +int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); #ifdef __cplusplus } diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 52a8d9dbf0..15f74370ba 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -23,7 +23,6 @@ typedef struct { int32_t refId; - int32_t num; int32_t seq; int8_t stop; pthread_t thread; @@ -85,7 +84,6 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } - atomic_add_fetch_32(&tsWal.num, 1); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); return pWal; @@ -116,19 +114,28 @@ void walClose(void *handle) { if (handle == NULL) return; SWal *pWal = handle; + pthread_mutex_lock(&pWal->mutex); + taosClose(pWal->fd); if (!pWal->keep) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - if (remove(pWal->name) < 0) { - wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); - } else { - wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); + int64_t fileId = -1; + while (walGetNextFile(pWal, &fileId) >= 0) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); + + if (fileId == pWal->fileId) { + wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); + } else if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); + } else { + wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); + } } } else { wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); } + pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refId, pWal); } diff --git a/src/wal/src/walUtil.c b/src/wal/src/walUtil.c index c4ef6df195..7d79a39f85 100644 --- a/src/wal/src/walUtil.c +++ b/src/wal/src/walUtil.c @@ -19,7 +19,7 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int64_t curFileId = *nextFileId; - int64_t nearFileId = INT64_MAX; + int64_t minFileId = INT64_MAX; DIR *dir = opendir(pWal->path); if (dir == NULL) { @@ -35,23 +35,23 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int64_t id = atoll(name + WAL_PREFIX_LEN); if (id <= curFileId) continue; - if (id < nearFileId) { - nearFileId = id; + if (id < minFileId) { + minFileId = id; } } } closedir(dir); - if (nearFileId == INT64_MAX) return -1; + if (minFileId == INT64_MAX) return -1; - *nextFileId = nearFileId; + *nextFileId = minFileId; wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId); return 0; } int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) { - int64_t nearFileId = INT64_MAX; + int64_t minFileId = INT64_MAX; DIR *dir = opendir(pWal->path); if (dir == NULL) { @@ -68,18 +68,51 @@ int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *o if (id >= curFileId) continue; minDiff--; - if (id < nearFileId) { - nearFileId = id; + if (id < minFileId) { + minFileId = id; } } } closedir(dir); - if (nearFileId == INT64_MAX) return -1; + if (minFileId == INT64_MAX) return -1; if (minDiff > 0) return -1; - *oldFileId = nearFileId; + *oldFileId = minFileId; wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId); return 0; } + +int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { + int64_t maxFileId = INT64_MIN; + + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent *ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + + if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + int64_t id = atoll(name + WAL_PREFIX_LEN); + if (id > maxFileId) { + maxFileId = id; + } + } + } + closedir(dir); + + if (maxFileId == INT64_MAX) { + *newFileId = 0; + } else { + *newFileId = maxFileId; + } + + wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId); + + return 0; +} \ No newline at end of file diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 39a7edb240..68ea5e2b72 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -36,7 +36,12 @@ int32_t walRenew(void *handle) { wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } - pWal->fileId = (pWal->keep ? 0 : taosGetTimestampUs()); + if (pWal->keep) { + pWal->fileId = 0; + } else { + if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0; + pWal->fileId++; + } snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); @@ -82,6 +87,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) { taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); int32_t contLen = pHead->len + sizeof(SWalHead); + pthread_mutex_lock(&pWal->mutex); + if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); @@ -90,6 +97,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) { wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId); } + pthread_mutex_unlock(&pWal->mutex); + ASSERT(contLen == pHead->len + sizeof(SWalHead)); return code; -- GitLab