From d4fbd7bbab95c3b5f9e85b82d492813b74957ff0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 30 Oct 2020 18:08:48 +0800 Subject: [PATCH] TD-1846 --- src/dnode/src/dnodeMain.c | 2 + src/inc/tsync.h | 2 +- src/inc/twal.h | 10 +- src/mnode/src/mnodeSdb.c | 4 +- src/sync/src/syncRetrieve.c | 4 +- src/sync/test/syncServer.c | 2 +- src/util/inc/talloc.h | 3 +- src/util/src/talloc.c | 10 ++ src/vnode/src/vnodeMain.c | 5 +- src/wal/inc/walInt.h | 21 ++- src/wal/src/walMgmt.c | 91 ++++++------- src/wal/src/walWrite.c | 261 ++++++++++++++++-------------------- src/wal/test/waltest.c | 6 +- 13 files changed, 199 insertions(+), 222 deletions(-) diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 97e6f2ce6d..c46099bd14 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -19,6 +19,7 @@ #include "tutil.h" #include "tconfig.h" #include "tglobal.h" +#include "twal.h" #include "dnode.h" #include "dnodeInt.h" #include "dnodeMgmt.h" @@ -50,6 +51,7 @@ typedef struct { static const SDnodeComponent tsDnodeComponents[] = { {"storage", dnodeInitStorage, dnodeCleanupStorage}, + {"wal", walInit, walCleanUp}, {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, diff --git a/src/inc/tsync.h b/src/inc/tsync.h index ca0f70d104..11b81f9379 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -68,7 +68,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file -typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index); +typedef int (*FGetWalInfo)(void *ahandle, char *name, int64_t *index); // when a forward pkt is received, call this to handle data typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type); diff --git a/src/inc/twal.h b/src/inc/twal.h index 626c4434e9..b87831381d 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -51,13 +51,13 @@ int32_t walInit(); void walCleanUp(); twalh walOpen(char *path, SWalCfg *pCfg); -int walAlter(twalh pWal, SWalCfg *pCfg); +int32_t walAlter(twalh pWal, SWalCfg *pCfg); void walClose(twalh); -int walRenew(twalh); -int walWrite(twalh, SWalHead *); +int32_t walRenew(twalh); +int32_t walWrite(twalh, SWalHead *); void walFsync(twalh); -int walRestore(twalh, void *pVnode, FWalWrite writeFp); -int walGetWalFile(twalh, char *name, uint32_t *index); +int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); +int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); int64_t walGetVersion(twalh); #ifdef __cplusplus diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8c61c61a10..a04c161599 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -175,7 +175,7 @@ static void *sdbGetTableFromId(int32_t tableId) { } static int32_t sdbInitWal() { - SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0}; + SWalCfg walCfg = {.vgId = 1, .walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0}; char temp[TSDB_FILENAME_LEN]; sprintf(temp, "%s/wal", tsMnodeDir); tsSdbObj.wal = walOpen(temp, &walCfg); @@ -237,7 +237,7 @@ static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint3 return 0; } -static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) { +static int sdbGetWalInfo(void *ahandle, char *name, int64_t *index) { return walGetWalFile(tsSdbObj.wal, name, index); } diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 60625d75ec..0137794d18 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -287,7 +287,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, return -1; } -static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { +static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { SSyncNode *pNode = pPeer->pSyncNode; int code = -1; char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file @@ -377,7 +377,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { int32_t size; struct stat fstat; int code = -1; - uint32_t index = 0; + int64_t index = 0; while (1) { // retrieve wal info diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 380b971fa8..9ae45b25e3 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex return magic; } -int getWalInfo(void *ahandle, char *name, uint32_t *index) { +int getWalInfo(void *ahandle, char *name, uint64_t *index) { struct stat fstat; char aname[280]; diff --git a/src/util/inc/talloc.h b/src/util/inc/talloc.h index 5f04f4b99c..a59e88cf13 100644 --- a/src/util/inc/talloc.h +++ b/src/util/inc/talloc.h @@ -23,18 +23,19 @@ extern "C" { #ifdef TSDB_USE_SYS_MEM #define tmalloc(size) malloc(size) #define tcalloc(size) calloc(1, size) + #define trealloc(p, size) realloc(p, size) #define tmemalign(alignment, size) malloc(size) #define tfree(p) free(p) #define tmemzero(p, size) memset(p, 0, size) #else void *tmalloc(int32_t size); void *tcalloc(int32_t size); + void *trealloc(void *p, int32_t size); void *tmemalign(int32_t alignment, int32_t size); void tfree(void *p); void tmemzero(void *p, int32_t size); #endif - #ifdef __cplusplus } #endif diff --git a/src/util/src/talloc.c b/src/util/src/talloc.c index 4f7067a26a..6f23b39393 100644 --- a/src/util/src/talloc.c +++ b/src/util/src/talloc.c @@ -40,6 +40,16 @@ void *tcalloc(int32_t size) { return p; } +void *trealloc(void *p, int32_t size) { + p = realloc(p, size); + if (p == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to realloc memory, size:%d reason:%s", size, strerror(errno)); + } + + return p; +} + void tfree(void *p) { free(p); } void tmemzero(void *p, int32_t size) { memset(p, 0, size); } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 9c283dd9c4..7308027817 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -42,7 +42,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); -static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); +static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); @@ -304,6 +304,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + pVnode->walCfg.vgId = pVnode->vgId; if (pVnode->wal == NULL) { vnodeCleanUp(pVnode); return terrno; @@ -621,7 +622,7 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); } -static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { +static int vnodeGetWalInfo(void *ahandle, char *name, int64_t *index) { SVnodeObj *pVnode = ahandle; return walGetWalFile(pVnode->wal, name, index); } diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index fc6955ab97..3c34c68f83 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -31,9 +31,13 @@ extern int32_t wDebugFlag; #define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} -#define walPrefix "wal" -#define walRefreshIntervalMs 1000 -#define walSignature (uint32_t)(0xFAFBFDFE) +#define WAL_PREFIX "wal" +#define WAL_PREFIX_LEN 4 +#define WAL_REFRESH_MS 1000 +#define WAL_MAX_SIZE (1024 * 1024) +#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32) typedef struct { uint64_t version; @@ -43,14 +47,9 @@ typedef struct { int32_t level; int32_t fsyncPeriod; int32_t fsyncSeq; - int32_t fileIndex; - void* timer; - void* signature; - int max; // maximum number of wal files - uint32_t id; // increase continuously - int num; // number of wal files - char path[TSDB_FILENAME_LEN]; - char name[TSDB_FILENAME_LEN + 16]; + int64_t fileId; + char path[WAL_PATH_LEN]; + char name[WAL_FILE_LEN]; pthread_mutex_t mutex; } SWal; diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index ebd1328c1b..b1e32496cc 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -27,7 +27,6 @@ typedef struct { int32_t num; int32_t seq; int8_t stop; - int8_t reserved[3]; pthread_t thread; pthread_mutex_t mutex; } SWalMgmt; @@ -48,7 +47,7 @@ int32_t walInit() { return code; } - wInfo("wal module is initialized"); + wInfo("wal module is initialized, refId:%d", tsWal.refId); return code; } @@ -65,15 +64,13 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } + pWal->vgId = pCfg->vgId; pWal->fd = -1; - pWal->max = pCfg->wals; - pWal->id = 0; - pWal->num = 0; + pWal->fileId = -1; pWal->level = pCfg->walLevel; pWal->keep = pCfg->keep; pWal->fsyncPeriod = pCfg->fsyncPeriod; - pWal->signature = pWal; - tstrncpy(pWal->path, path, sizeof(path)); + tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; @@ -90,12 +87,14 @@ void *walOpen(char *path, SWalCfg *pCfg) { } atomic_add_fetch_32(&tsWal.num, 1); - wDebug("vgId:%d, wal is opened, level:%d period:%d path:%s", pWal->vgId, pWal->level, pWal->fsyncPeriod, pWal->path); + wDebug("vgId:%d, wal:%p is opened, level:%d period:%d path:%s", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod, + pWal->path); return pWal; } int32_t walAlter(void *handle, SWalCfg *pCfg) { + if (handle == NULL) return TSDB_CODE_WAL_APP_ERROR; SWal *pWal = handle; if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { @@ -109,26 +108,27 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) { pWal->level = pCfg->walLevel; pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->fsyncSeq = pCfg->fsyncPeriod % 1000; + if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; return TSDB_CODE_SUCCESS; } void walClose(void *handle) { + if (handle == NULL) return; + SWal *pWal = handle; taosClose(pWal->fd); if (pWal->keep == 0) { - // remove all files in the directory - for (int32_t i = 0; i < pWal->num; ++i) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i); - if (remove(pWal->name) < 0) { - wError("vgId:%d, wal:%s, failed to remove", pWal->vgId, pWal->name); - } else { - wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); - } + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); + if (remove(pWal->name) < 0) { + wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); + } else { + wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); } } else { - wDebug("vgId:%d, wal:%s, it is closed and kept", pWal->vgId, pWal->name); + wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name); } taosRemoveRef(tsWal.refId, pWal); @@ -145,6 +145,8 @@ static int32_t walInitObj(SWal *pWal) { return TSDB_CODE_SUCCESS; } + walRenew(pWal); + if (pWal && pWal->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); @@ -156,53 +158,45 @@ static int32_t walInitObj(SWal *pWal) { } static void walFreeObj(void *wal) { - SWal *pWal = pWal; - wDebug("vgId:%d, wal is freed", pWal->vgId); + SWal *pWal = wal; + wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); taosClose(pWal->fd); pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } -// static bool walNeedFsync(SWal *pWal) { -// if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { -// return false; -// } +static bool walNeedFsync(SWal *pWal) { + if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { + return false; + } -// if (tsWal.seq % pWal->fsyncSeq == 0) { -// return true; -// } + if (tsWal.seq % pWal->fsyncSeq == 0) { + return true; + } -// return false; -// } + return false; +} static void walUpdateSeq() { - taosMsleep(walRefreshIntervalMs); + taosMsleep(WAL_REFRESH_MS); if (++tsWal.seq <= 0) { tsWal.seq = 1; } } static void walFsyncAll() { - // int32_t code; - // void * pIter = taosRefCreateIter(tsWal.refId); - - // while (taosRefIterNext(pIter)) { - // SWal *pWal = taosRefIterGet(pIter); - // if (pWal == NULL) break; - - // if (!walNeedFsync(pWal)) { - // wTrace("wal:%s, do fsync, level:%d seq:%d rseq:%d", pWal->name, pWal->level, pWal->fsyncSeq, tsWal.refreshSeq); - // code = walFsync(pWal); - // if (code != TSDB_CODE_SUCCESS) { - // wError("wal:%s, fsync failed(%s)", pWal->name, strerror(code)); - // } - // } - - // taosReleaseRef(pWal); - // } - - // taosRefDestroyIter(pIter); + SWal *pWal = taosIterateRef(tsWal.refId, NULL); + while (pWal) { + if (walNeedFsync(pWal)) { + wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); + int32_t code = fsync(pWal->fd); + if (code != 0) { + wError("vgId:%d, wal:%s, fsync failed, reason:%s", pWal->vgId, pWal->name, strerror(code)); + } + } + pWal = taosIterateRef(tsWal.refId, pWal); + } } static void *walThreadFunc(void *param) { @@ -232,6 +226,7 @@ static int32_t walCreateThread() { } static void walStopThread() { + tsWal.stop = 1; if (tsWal.thread) { pthread_join(tsWal.thread, NULL); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 2eea80e20a..6c3c97a0e9 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -23,77 +23,79 @@ #include "twal.h" #include "walInt.h" -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); -static int32_t walRemoveWalFiles(const char *path); +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name); +static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName); int32_t walRenew(void *handle) { if (handle == NULL) return 0; - SWal *pWal = handle; - terrno = 0; + SWal * pWal = handle; + int32_t code = 0; pthread_mutex_lock(&pWal->mutex); if (pWal->fd >= 0) { close(pWal->fd); - pWal->id++; wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); } - pWal->num++; + uint64_t lastId = pWal->fileId; + if (pWal->keep) { + pWal->fileId = 0; + } else { + pWal->fileId = taosGetTimestampUs(); + } - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id); + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to open, reason:%s", pWal->vgId, pWal->name, strerror(errno)); + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); } else { wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); + } - if (pWal->num > pWal->max) { - // remove the oldest wal file - char name[TSDB_FILENAME_LEN * 3]; - snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); - if (remove(name) < 0) { - wError("vgId:%d, wal:%s, failed to remove(%s)", pWal->vgId, name, strerror(errno)); - } else { - wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, name); - } - - pWal->num--; + if (pWal->keep != 1 && lastId != -1) { + // remove last wal file + char name[TSDB_FILENAME_LEN + 20]; + snprintf(name, sizeof(name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, lastId); + if (remove(name) < 0) { + wError("vgId:%d, wal:%s, failed to remove since %s", pWal->vgId, pWal->name, strerror(errno)); + } else { + wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); } } pthread_mutex_unlock(&pWal->mutex); - return terrno; + return code; } int32_t walWrite(void *handle, SWalHead *pHead) { - SWal *pWal = handle; - if (pWal == NULL) return -1; + if (handle == NULL) return -1; - terrno = 0; + SWal * pWal = handle; + int32_t code = 0; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pHead->version <= pWal->version) return 0; - pHead->signature = walSignature; + pHead->signature = WAL_SIGNATURE; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); int32_t contLen = pHead->len + sizeof(SWalHead); if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { - terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to write(%s)", pWal->vgId, pWal->name, strerror(errno)); - return terrno; + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); } else { pWal->version = pHead->version; } + ASSERT(contLen == pHead->len + sizeof(SWalHead)); - return 0; + return code; } void walFsync(void *handle) { @@ -102,156 +104,116 @@ void walFsync(void *handle) { if (pWal->fsyncPeriod == 0) { if (fsync(pWal->fd) < 0) { - wError("vgId:%d, wal:%s, fsync failed(%s)", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, wal:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); } } } int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) { - SWal * pWal = handle; - int32_t count = 0; - uint32_t maxId = 0; - uint32_t minId = -1; - uint32_t index = 0; - int32_t code = 0; - struct dirent *ent; + if (handle == NULL) return -1; - terrno = 0; - int32_t plen = strlen(walPrefix); - char opath[TSDB_FILENAME_LEN + 5]; - snprintf(opath, sizeof(opath), "%s", pWal->path); + SWal * pWal = handle; + int32_t count = 0; - DIR *dir = opendir(opath); + DIR *dir = opendir(pWal->path); if (dir == NULL && errno == ENOENT) return 0; - if (dir == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - return code; - } + if (dir == NULL) return TAOS_SYSTEM_ERROR(errno); + struct dirent *ent; while ((ent = readdir(dir)) != NULL) { - if (strncmp(ent->d_name, walPrefix, plen) == 0) { - index = atol(ent->d_name + plen); - if (index > maxId) maxId = index; - if (index < minId) minId = index; - count++; - } - } + char *fileName = ent->d_name; - closedir(dir); - pWal->fileIndex = maxId; + if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + if (fileId == pWal->fileId) continue; - if (count == 0) { - if (pWal->keep) terrno = walRenew(pWal); - return terrno; - } + wDebug("vgId:%d, wal:%s, will be restored", pWal->vgId, fileName); - if (count != (maxId - minId + 1)) { - wError("vgId:%d, wal:%s, messed up, count:%d max:%d min:%d", pWal->vgId, opath, count, maxId, minId); - terrno = TSDB_CODE_WAL_APP_ERROR; - } else { - wDebug("vgId:%d, wal:%s, %d files will be restored", pWal->vgId, opath, count); + int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, fileName); + if (code != TSDB_CODE_SUCCESS) continue; - for (index = minId; index <= maxId; ++index) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index); - terrno = walRestoreWalFile(pWal, pVnode, writeFp); - if (terrno < 0) continue; + wInfo("vgId:%d, wal:%s, restore success, remove this file", pWal->vgId, fileName); + remove(fileName); + + count++; } } + closedir(dir); - if (terrno == 0) { - if (pWal->keep == 0) { - terrno = walRemoveWalFiles(opath); - if (terrno == 0) { - if (remove(opath) < 0) { - wError("vgId:%d, wal:%s, failed to remove directory, reason:%s", pWal->vgId, opath, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - } - } + if (pWal->keep) { + if (count == 0) { + return walRenew(pWal); } else { // open the existing WAL file in append mode - pWal->num = count; - pWal->id = maxId; - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId); + pWal->fileId = 0; + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + return TAOS_SYSTEM_ERROR(errno); } } } - return terrno; + return TSDB_CODE_SUCCESS; } -int32_t walGetWalFile(void *handle, char *name, uint32_t *index) { - SWal * pWal = handle; - int32_t code = 1; - int32_t first = 0; - - name[0] = 0; - if (pWal == NULL || pWal->num == 0) return 0; +int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { + if (handle == NULL) return -1; + SWal *pWal = handle; pthread_mutex_lock(&(pWal->mutex)); - - first = pWal->id + 1 - pWal->num; - if (*index == 0) *index = first; // set to first one - - if (*index < first && *index > pWal->id) { - code = -1; // index out of range - } else { - sprintf(name, "wal/%s%d", walPrefix, *index); - code = (*index == pWal->id) ? 0 : 1; + int32_t code = walGetNextFile(pWal, *fileId, fileId, fileName); + if (code == 0) { + code = (*fileId == pWal->fileId) ? 0 : 1; } - pthread_mutex_unlock(&(pWal->mutex)); return code; } -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { - char * name = pWal->name; - int32_t size = 1024 * 1024; // default 1M buffer size - - terrno = 0; - char *buffer = malloc(size); +static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) { + int32_t size = WAL_MAX_SIZE; + void * buffer = tmalloc(size); if (buffer == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); } - SWalHead *pHead = (SWalHead *)buffer; - int32_t fd = open(name, O_RDWR); if (fd < 0) { - wError("vgId:%d, wal:%s, failed to open for restore(%s)", pWal->vgId, name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - free(buffer); - return terrno; + wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + tfree(buffer); + return TAOS_SYSTEM_ERROR(errno); } wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); - size_t offset = 0; + int32_t code = TSDB_CODE_SUCCESS; + size_t offset = 0; + SWalHead *pHead = buffer; + while (1) { int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead)); if (ret == 0) break; if (ret < 0) { wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < sizeof(SWalHead)) { - wError("vgId:%d, wal:%s, failed to read head, ret:%d, skip the rest of file", pWal->vgId, name, ret); + wError("vgId:%d, wal:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId, + name, strerror(errno), ret); taosFtruncate(fd, offset); fsync(fd); break; } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wWarn("vgId:%d, wal:%s, cksum is messed up, skip the rest of file", pWal->vgId, name); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + wError("vgId:%d, wal:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name); + code = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(false); break; } @@ -260,23 +222,24 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { size = sizeof(SWalHead) + pHead->len; buffer = realloc(buffer, size); if (buffer == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + code = TAOS_SYSTEM_ERROR(errno); break; } - pHead = (SWalHead *)buffer; + pHead = buffer; } ret = taosTRead(fd, pHead->cont, pHead->len); if (ret < 0) { wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < pHead->len) { - wError("vgId:%d, wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", pWal->vgId, name, pHead->len, - ret); + wError("vgId:%d, wal:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId, + name, strerror(errno), ret, pHead->len); taosFtruncate(fd, offset); fsync(fd); break; @@ -289,43 +252,49 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { } close(fd); - free(buffer); + tfree(buffer); - return terrno; + return code; } -static int32_t walRemoveWalFiles(const char *path) { - int32_t plen = strlen(walPrefix); - char name[TSDB_FILENAME_LEN * 3]; +int64_t walGetVersion(twalh param) { + SWal *pWal = param; + if (pWal == 0) return 0; + + return pWal->version; +} - terrno = 0; +static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId, char *nextFileName) { + int64_t nearFileId = INT64_MAX; + char nearFileName[WAL_FILE_LEN] = {0}; - struct dirent *ent; - DIR *dir = opendir(path); - if (dir == NULL && errno == ENOENT) return 0; + DIR *dir = opendir(pWal->path); if (dir == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; } + struct dirent *ent; while ((ent = readdir(dir)) != NULL) { - if (strncmp(ent->d_name, walPrefix, plen) == 0) { - snprintf(name, sizeof(name), "%s/%s", path, ent->d_name); - if (remove(name) < 0) { - wError("wal:%s, failed to remove(%s)", name, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + char *fileName = ent->d_name; + + if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { + uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + if (fileId <= lastFileId) continue; + + if (fileId < nearFileId) { + nearFileId = fileId; + tstrncpy(nearFileName, fileName, WAL_FILE_LEN); } } } - closedir(dir); - return terrno; -} + if (nearFileId == INT64_MAX) return -1; -int64_t walGetVersion(twalh param) { - SWal *pWal = param; - if (pWal == 0) return 0; + *nexFileId = nearFileId; + tstrncpy(nextFileName, nearFileName, WAL_FILE_LEN); + wTrace("vgId:%d, path:%s, lastfile %" PRId64 ", nextfile is %s", pWal->vgId, pWal->path, lastFileId, nextFileName); - return pWal->version; + return 0; } diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index bbee1347b8..ba0011af29 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -115,17 +115,17 @@ int main(int argc, char *argv[]) { printf("%d wal files are written\n", total); - uint32_t index = 0; + int64_t index = 0; char name[256]; while (1) { int code = walGetWalFile(pWal, name, &index); if (code == -1) { - printf("failed to get wal file, index:%d\n", index); + printf("failed to get wal file, index:%" PRId64 "\n", index); break; } - printf("index:%d wal:%s\n", index, name); + printf("index:%" PRId64 " wal:%s\n", index, name); if (code == 0) break; index++; -- GitLab