From dbe3b6f8f6663e1fab22b54ee11694b4a1680da0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 27 Nov 2020 23:27:13 +0800 Subject: [PATCH] TD-1895 --- src/dnode/src/dnodeMain.c | 2 ++ src/inc/twal.h | 5 ++- src/os/inc/osFile.h | 19 ++++------- src/os/src/detail/osFile.c | 2 +- src/util/inc/tfile.h | 21 +++++++----- src/util/src/tfile.c | 68 ++++++++++++++++++++++++++++++-------- src/wal/inc/walInt.h | 2 +- src/wal/src/walMgmt.c | 9 ++--- src/wal/src/walWrite.c | 55 +++++++++++++++--------------- 9 files changed, 113 insertions(+), 70 deletions(-) diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 130be0af20..9f52dbd331 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -19,6 +19,7 @@ #include "tutil.h" #include "tconfig.h" #include "tglobal.h" +#include "tfile.h" #include "twal.h" #include "trpc.h" #include "dnode.h" @@ -55,6 +56,7 @@ typedef struct { } SDnodeComponent; static const SDnodeComponent tsDnodeComponents[] = { + {"tfile", tfInit, tfCleanup}, {"rpc", rpcInit, rpcCleanup}, {"storage", dnodeInitStorage, dnodeCleanupStorage}, {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, diff --git a/src/inc/twal.h b/src/inc/twal.h index 8dd3a8a912..1645de77aa 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -51,9 +51,8 @@ typedef struct { typedef void * twalh; // WAL HANDLE typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); -int32_t walInit(); -void walCleanUp(); - +int32_t walInit(); +void walCleanUp(); twalh walOpen(char *path, SWalCfg *pCfg); int32_t walAlter(twalh pWal, SWalCfg *pCfg); void walStop(twalh); diff --git a/src/os/inc/osFile.h b/src/os/inc/osFile.h index 62e44d8eb0..c9b3b9cd76 100644 --- a/src/os/inc/osFile.h +++ b/src/os/inc/osFile.h @@ -20,17 +20,6 @@ extern "C" { #endif -#define tread(fd, buf, count) read(fd, buf, count) -#define twrite(fd, buf, count) write(fd, buf, count) -#define tlseek(fd, offset, whence) lseek(fd, offset, whence) -#define tclose(fd) \ - { \ - if (FD_VALID(fd)) { \ - close(fd); \ - fd = FD_INITIALIZER; \ - } \ - } - int64_t taosReadImp(int32_t fd, void *buf, int64_t count); int64_t taosWriteImp(int32_t fd, void *buf, int64_t count); int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence); @@ -39,7 +28,13 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP #define taosRead(fd, buf, count) taosReadImp(fd, buf, count) #define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count) #define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence) -#define taosClose(x) tclose(x) +#define taosClose(fd) \ + { \ + if (FD_VALID(fd)) { \ + close(fd); \ + fd = FD_INITIALIZER; \ + } \ + } // TAOS_OS_FUNC_FILE_SENDIFLE int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 23fc88b8e1..2e6886aa21 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -116,7 +116,7 @@ int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) { } int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { - return (int64_t)tlseek(fd, (long)offset, whence); + return (int64_t)lseek(fd, (long)offset, whence); } #ifndef TAOS_OS_FUNC_FILE_SENDIFLE diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h index 10b7c1df35..b32f297d9c 100644 --- a/src/util/inc/tfile.h +++ b/src/util/inc/tfile.h @@ -20,23 +20,26 @@ extern "C" { #endif -#include - // init taos file module -int32_t tfinit(); +int32_t tfInit(); // clean up taos file module -void tfcleanup(); +void tfCleanup(); // the same syntax as UNIX standard open/close/read/write // but FD is int64_t and will never be reused -int64_t tfopen(const char *pathname, int32_t flags); -int64_t tfclose(int64_t tfd); -int64_t tfwrite(int64_t tfd, void *buf, int64_t count); -int64_t tfread(int64_t tfd, void *buf, int64_t count); +int64_t tfOpen(const char *pathname, int32_t flags); +int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode); +int64_t tfClose(int64_t tfd); +int64_t tfWrite(int64_t tfd, void *buf, int64_t count); +int64_t tfRead(int64_t tfd, void *buf, int64_t count); +int32_t tfFsync(int64_t tfd); +bool tfValid(int64_t tfd); +int32_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); +int32_t tfFtruncate(int64_t tfd, int64_t length); #ifdef __cplusplus } #endif -#endif // TDENGINE_TREF_H +#endif // TDENGINE_TFILE_H diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index 27ba30fe81..49b28dad6f 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" #include "tulog.h" @@ -21,40 +22,52 @@ static int32_t tsFileRsetId = -1; -static void taosCloseFile(void *p) { +static void tfCloseFile(void *p) { close((int32_t)(uintptr_t)p); } -int32_t tfinit() { - tsFileRsetId = taosOpenRef(2000, taosCloseFile); - return tsFileRsetId; +int32_t tfInit() { + tsFileRsetId = taosOpenRef(2000, tfCloseFile); + if (tsFileRsetId > 0) { + return 0; + } else { + return -1; + } } -void tfcleanup() { +void tfCleanup() { if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); tsFileRsetId = -1; } -int64_t tfopen(const char *pathname, int32_t flags) { - int32_t fd = open(pathname, flags); - +static int64_t tfOpenImp(int32_t fd) { if (fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; - } + } - void *p = (void *)(int64_t)fd; + void * p = (void *)(int64_t)fd; int64_t rid = taosAddRef(tsFileRsetId, p); if (rid < 0) close(fd); return rid; } -int64_t tfclose(int64_t tfd) { +int64_t tfOpen(const char *pathname, int32_t flags) { + int32_t fd = open(pathname, flags); + return tfOpenImp(fd); +} + +int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode) { + int32_t fd = open(pathname, flags, mode); + return tfOpenImp(fd); +} + +int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); } -int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { +int64_t tfWrite(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; @@ -67,7 +80,7 @@ int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { return ret; } -int64_t tfread(int64_t tfd, void *buf, int64_t count) { +int64_t tfRead(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; @@ -79,3 +92,32 @@ int64_t tfread(int64_t tfd, void *buf, int64_t count) { taosReleaseRef(tsFileRsetId, tfd); return ret; } + +int64_t tfFsync(int64_t tfd) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + return fsync(fd); +} + +bool tfValid(int64_t tfd) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + return p != NULL; +} + +int32_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + return taosLSeek(fd, offset, whence); +} + +int32_t tfFtruncate(int64_t tfd, int64_t length) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + return taosFtruncate(fd, length); +} diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index b0edabfbd8..06748d885f 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -44,8 +44,8 @@ typedef struct { uint64_t version; int64_t fileId; int64_t rid; + int64_t tfd; int32_t vgId; - int32_t fd; int32_t keep; int32_t level; int32_t fsyncPeriod; diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 36c190be3e..72ea239817 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -17,6 +17,7 @@ #include "os.h" #include "taoserror.h" #include "tref.h" +#include "tfile.h" #include "twal.h" #include "walInt.h" @@ -61,7 +62,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { } pWal->vgId = pCfg->vgId; - pWal->fd = -1; + pWal->tfd = -1; pWal->fileId = -1; pWal->level = pCfg->walLevel; pWal->keep = pCfg->keep; @@ -124,7 +125,7 @@ void walClose(void *handle) { SWal *pWal = handle; pthread_mutex_lock(&pWal->mutex); - taosClose(pWal->fd); + tfClose(pWal->tfd); pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refId, pWal->rid); } @@ -143,7 +144,7 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); - taosClose(pWal->fd); + tfClose(pWal->tfd); pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -172,7 +173,7 @@ static void walFsyncAll() { while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); - int32_t code = fsync(pWal->fd); + int32_t code = tfFsync(pWal->tfd); if (code != 0) { wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 8a532c8164..10e1b4dd61 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -18,6 +18,7 @@ #include "os.h" #include "taoserror.h" #include "tchecksum.h" +#include "tfile.h" #include "twal.h" #include "walInt.h" @@ -36,8 +37,8 @@ int32_t walRenew(void *handle) { pthread_mutex_lock(&pWal->mutex); - if (pWal->fd >= 0) { - tclose(pWal->fd); + if (tfValid(pWal->tfd)) { + tfClose(pWal->tfd); wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } @@ -49,9 +50,9 @@ int32_t walRenew(void *handle) { } snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); - if (pWal->fd < 0) { + if (!tfValid(pWal->tfd)) { code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); } else { @@ -67,7 +68,7 @@ void walRemoveOneOldFile(void *handle) { SWal *pWal = handle; if (pWal == NULL) return; if (pWal->keep == TAOS_WAL_KEEP) return; - if (pWal->fd <= 0) return; + if (!tfValid(pWal->tfd)) return; pthread_mutex_lock(&pWal->mutex); @@ -113,7 +114,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) { int32_t code = 0; // no wal - if (pWal->fd <= 0) return 0; + if (!tfValid(pWal->tfd)) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pHead->version <= pWal->version) return 0; @@ -123,12 +124,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) { pthread_mutex_lock(&pWal->mutex); - if (taosWrite(pWal->fd, pHead, contLen) != contLen) { + if (tfWrite(pWal->tfd, pHead, contLen) != contLen) { code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); } else { - wTrace("vgId:%d, write wal, fileId:%" PRId64 " fd:%d hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId, - pWal->fileId, pWal->fd, pHead->version, pWal->version, pHead->len); + wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId, + pWal->fileId, pWal->tfd, pHead->version, pWal->version, pHead->len); pWal->version = pHead->version; } @@ -141,11 +142,11 @@ int32_t walWrite(void *handle, SWalHead *pHead) { void walFsync(void *handle, bool forceFsync) { SWal *pWal = handle; - if (pWal == NULL || pWal->fd < 0) return; + if (pWal == NULL || !tfValid(pWal->tfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId); - if (fsync(pWal->fd) < 0) { + if (tfFsync(pWal->tfd) < 0) { wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno)); } } @@ -186,8 +187,8 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { // open the existing WAL file in append mode pWal->fileId = 0; snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); - if (pWal->fd < 0) { + pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); + if (!tfValid(pWal->tfd)) { wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } @@ -217,22 +218,22 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { return code; } -static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) { - taosFtruncate(fd, offset); - fsync(fd); +static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { + tfFtruncate(tfd, offset); + tfFsync(tfd); } -static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) { +static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { int64_t pos = *offset; while (1) { pos++; - if (lseek(fd, pos, SEEK_SET) < 0) { + if (tfLseek(tfd, pos, SEEK_SET) < 0) { wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); return TSDB_CODE_WAL_FILE_CORRUPTED; } - if (taosRead(fd, pHead, sizeof(SWalHead)) <= 0) { + if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) { wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); return TSDB_CODE_WAL_FILE_CORRUPTED; } @@ -259,8 +260,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch return TAOS_SYSTEM_ERROR(errno); } - int32_t fd = open(name, O_RDWR); - if (fd < 0) { + int64_t tfd = tfOpen(name, O_RDWR); + if (!tfValid(tfd)) { wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); tfree(buffer); return TAOS_SYSTEM_ERROR(errno); @@ -273,7 +274,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch SWalHead *pHead = buffer; while (1) { - int32_t ret = taosRead(fd, pHead, sizeof(SWalHead)); + int32_t ret = tfRead(tfd, pHead, sizeof(SWalHead)); if (ret == 0) break; if (ret < 0) { @@ -284,16 +285,16 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch if (ret < sizeof(SWalHead)) { wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); - walFtruncate(pWal, fd, offset); + walFtruncate(pWal, tfd, offset); break; } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, fd, &offset); + code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, fd, offset); + walFtruncate(pWal, tfd, offset); break; } } @@ -310,7 +311,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch pHead = buffer; } - ret = taosRead(fd, pHead->cont, pHead->len); + ret = tfRead(tfd, pHead->cont, pHead->len); if (ret < 0) { wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); @@ -332,7 +333,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); } - tclose(fd); + tfClose(tfd); tfree(buffer); return code; -- GitLab