diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c
index 97e6f2ce6debe7ffc273ca103e3f55576da0eb31..c46099bd14159b1c58d66e746696bd36e608e9e0 100644
--- a/src/dnode/src/dnodeMain.c
+++ b/src/dnode/src/dnodeMain.c
@@ -19,6 +19,7 @@
#include "tutil.h"
#include "tconfig.h"
#include "tglobal.h"
+#include "twal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
@@ -50,6 +51,7 @@ typedef struct {
static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage},
+ {"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
diff --git a/src/inc/tsync.h b/src/inc/tsync.h
index ca0f70d104d603d176d89dd5b92979433f390466..671adefab83fed42f7bc3fb0ad9591050f118e73 100644
--- a/src/inc/tsync.h
+++ b/src/inc/tsync.h
@@ -68,7 +68,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
-typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index);
+typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type);
diff --git a/src/inc/twal.h b/src/inc/twal.h
index 1ce7b132b068526f87fa594845bb591ade6f4966..94bdcacfce69e63f3bb1775f47a06c702f587c10 100644
--- a/src/inc/twal.h
+++ b/src/inc/twal.h
@@ -19,9 +19,11 @@
extern "C" {
#endif
-#define TAOS_WAL_NOLOG 0
-#define TAOS_WAL_WRITE 1
-#define TAOS_WAL_FSYNC 2
+typedef enum {
+ TAOS_WAL_NOLOG = 0,
+ TAOS_WAL_WRITE = 1,
+ TAOS_WAL_FSYNC = 2
+} EWalType;
typedef struct {
int8_t msgType;
@@ -34,8 +36,9 @@ typedef struct {
} SWalHead;
typedef struct {
- int8_t walLevel; // wal level
+ int32_t vgId;
int32_t fsyncPeriod; // millisecond
+ int8_t walLevel; // wal level
int8_t wals; // number of WAL files;
int8_t keep; // keep the wal file when closed
} SWalCfg;
@@ -43,14 +46,17 @@ typedef struct {
typedef void* twalh; // WAL HANDLE
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type);
-twalh walOpen(const char *path, const SWalCfg *pCfg);
-int walAlter(twalh pWal, const SWalCfg *pCfg);
+int32_t walInit();
+void walCleanUp();
+
+twalh walOpen(char *path, SWalCfg *pCfg);
+int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walClose(twalh);
-int walRenew(twalh);
-int walWrite(twalh, SWalHead *);
+int32_t walRenew(twalh);
+int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh);
-int walRestore(twalh, void *pVnode, FWalWrite writeFp);
-int walGetWalFile(twalh, char *name, uint32_t *index);
+int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
+int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
int64_t walGetVersion(twalh);
#ifdef __cplusplus
diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c
index 8c61c61a10f6a5dd46c188acf32ef1242f3e7c50..e3c13505e49c0ced3bf2f2d981518d811df6231c 100644
--- a/src/mnode/src/mnodeSdb.c
+++ b/src/mnode/src/mnodeSdb.c
@@ -175,7 +175,7 @@ static void *sdbGetTableFromId(int32_t tableId) {
}
static int32_t sdbInitWal() {
- SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0};
+ SWalCfg walCfg = {.vgId = 1, .walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0};
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/wal", tsMnodeDir);
tsSdbObj.wal = walOpen(temp, &walCfg);
@@ -237,8 +237,8 @@ static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint3
return 0;
}
-static int sdbGetWalInfo(void *ahandle, char *name, uint32_t *index) {
- return walGetWalFile(tsSdbObj.wal, name, index);
+static int32_t sdbGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) {
+ return walGetWalFile(tsSdbObj.wal, fileName, fileId);
}
static void sdbNotifyRole(void *ahandle, int8_t role) {
diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c
index 60625d75eccdbe6bbb29f97b31ecd8e9855480a7..0137794d184c12fe7922d9961cff64b61f846f8e 100644
--- a/src/sync/src/syncRetrieve.c
+++ b/src/sync/src/syncRetrieve.c
@@ -287,7 +287,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return -1;
}
-static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
+static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode;
int code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
@@ -377,7 +377,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
int32_t size;
struct stat fstat;
int code = -1;
- uint32_t index = 0;
+ int64_t index = 0;
while (1) {
// retrieve wal info
diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c
index 380b971fa89bd1726e138c973a974bc995500693..0cf752da97848056a7e5e3892025e87c3aef4c5d 100644
--- a/src/sync/test/syncServer.c
+++ b/src/sync/test/syncServer.c
@@ -254,7 +254,7 @@ uint32_t getFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex
return magic;
}
-int getWalInfo(void *ahandle, char *name, uint32_t *index) {
+int getWalInfo(void *ahandle, char *name, int64_t *index) {
struct stat fstat;
char aname[280];
diff --git a/src/wal/inc/walMgmt.h b/src/util/inc/talloc.h
similarity index 54%
rename from src/wal/inc/walMgmt.h
rename to src/util/inc/talloc.h
index a23c7f8ec3d0064de23c03a62a81388b63b0f319..1fc4d759b0ee269520b0b85d1d3493c48a237d89 100644
--- a/src/wal/inc/walMgmt.h
+++ b/src/util/inc/talloc.h
@@ -13,13 +13,31 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_WAL_MGMT_H
-#define TDENGINE_WAL_MGMT_H
+#ifndef TDENGINE_UTIL_ALLOC_H
+#define TDENGINE_UTIL_ALLOC_H
#ifdef __cplusplus
extern "C" {
#endif
+#define TSDB_USE_SYS_MEM
+
+#ifdef TSDB_USE_SYS_MEM
+ #define tmalloc(size) malloc(size)
+ #define tcalloc(size) calloc(1, size)
+ #define trealloc(p, size) realloc(p, size)
+ #define tmemalign(alignment, size) malloc(size)
+ #define tfree(p) free(p)
+ #define tmemzero(p, size) memset(p, 0, size)
+#else
+ void *tmalloc(int32_t size);
+ void *tcalloc(int32_t size);
+ void *trealloc(void *p, int32_t size);
+ void *tmemalign(int32_t alignment, int32_t size);
+ void tfree(void *p);
+ void tmemzero(void *p, int32_t size);
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/src/util/src/talloc.c b/src/util/src/talloc.c
new file mode 100644
index 0000000000000000000000000000000000000000..d3d8ee811692098ba3e58ad3c795f4a468d55bde
--- /dev/null
+++ b/src/util/src/talloc.c
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "os.h"
+#include "taoserror.h"
+#include "tulog.h"
+#include "talloc.h"
+
+#define TSDB_HAVE_MEMALIGN
+#ifndef TSDB_USE_SYS_MEM
+
+void *tmalloc(int32_t size) {
+ void *p = malloc(size);
+ if (p == NULL) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ uError("failed to malloc memory, size:%d reason:%s", size, strerror(errno));
+ }
+
+ return p;
+}
+
+void *tcalloc(int32_t size) {
+ void *p = calloc(1, size);
+ if (p == NULL) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno));
+ }
+
+ return p;
+}
+
+void *trealloc(void *p, int32_t size) {
+ p = realloc(p, size);
+ if (p == NULL) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ uError("failed to realloc memory, size:%d reason:%s", size, strerror(errno));
+ }
+
+ return p;
+}
+
+void tfree(void *p) { free(p); }
+
+void tmemzero(void *p, int32_t size) { memset(p, 0, size); }
+
+#ifdef TSDB_HAVE_MEMALIGN
+
+void *tmemalign(int32_t alignment, int32_t size) {
+ void *p;
+
+ int err = posix_memalign(&p, alignment, size);
+ if (err) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ uError("failed to memalign memory, alignment:%d size:%d reason:%s", alignment, size, strerror(err));
+ p = NULL;
+ }
+
+ return p;
+}
+
+#else
+
+void *tmemalign(int32_t alignment, int32_t size) { return tmalloc(size); }
+
+#endif
+#endif
\ No newline at end of file
diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c
index 9c283dd9c4a941fe2fbbfa43fe95e549b92ae898..40eeeb4ed7d54b70220b00ef947a68f700789627 100644
--- a/src/vnode/src/vnodeMain.c
+++ b/src/vnode/src/vnodeMain.c
@@ -42,7 +42,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
-static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
+static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
@@ -303,6 +303,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
}
sprintf(temp, "%s/wal", rootDir);
+ pVnode->walCfg.vgId = pVnode->vgId;
pVnode->wal = walOpen(temp, &pVnode->walCfg);
if (pVnode->wal == NULL) {
vnodeCleanUp(pVnode);
@@ -621,9 +622,9 @@ static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uin
return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
}
-static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
+static int vnodeGetWalInfo(void *ahandle, char *fileName, int64_t *fileId) {
SVnodeObj *pVnode = ahandle;
- return walGetWalFile(pVnode->wal, name, index);
+ return walGetWalFile(pVnode->wal, fileName, fileId);
}
static void vnodeNotifyRole(void *ahandle, int8_t role) {
diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h
index 593611589d6e1eb5d90a0eb8986f21bb32d5d1c6..7e731d44db8506cead8d052ae8ce2476a207c86e 100644
--- a/src/wal/inc/walInt.h
+++ b/src/wal/inc/walInt.h
@@ -31,8 +31,32 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
-#define walPrefix "wal"
-#define walSignature (uint32_t)(0xFAFBFDFE)
+#define WAL_PREFIX "wal"
+#define WAL_PREFIX_LEN 3
+#define WAL_REFRESH_MS 1000
+#define WAL_MAX_SIZE (1024 * 1024)
+#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
+#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
+#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32)
+#define WAL_FILE_NUM 3
+
+typedef struct {
+ uint64_t version;
+ int64_t fileId;
+ int32_t vgId;
+ int32_t fd;
+ int32_t keep;
+ int32_t level;
+ int32_t fsyncPeriod;
+ int32_t fsyncSeq;
+ char path[WAL_PATH_LEN];
+ char name[WAL_FILE_LEN];
+ pthread_mutex_t mutex;
+} SWal;
+
+int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
+int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
+int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
#ifdef __cplusplus
}
diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c
index 2dd094d860528fa95084b7d3a0f404007cb757ed..15f74370ba27621ce2add4258fb91331aaebd244 100644
--- a/src/wal/src/walMgmt.c
+++ b/src/wal/src/walMgmt.c
@@ -16,6 +16,223 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
+#include "talloc.h"
+#include "tref.h"
#include "twal.h"
#include "walInt.h"
-#include "walMgmt.h"
\ No newline at end of file
+
+typedef struct {
+ int32_t refId;
+ int32_t seq;
+ int8_t stop;
+ pthread_t thread;
+ pthread_mutex_t mutex;
+} SWalMgmt;
+
+static SWalMgmt tsWal;
+static int32_t walCreateThread();
+static void walStopThread();
+static int32_t walInitObj(SWal *pWal);
+static void walFreeObj(void *pWal);
+
+int32_t walInit() {
+ tmemzero(&tsWal, sizeof(SWalMgmt));
+ tsWal.refId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
+
+ int32_t code = walCreateThread();
+ if (code != TSDB_CODE_SUCCESS) {
+ wError("failed to init wal module since %s", tstrerror(code));
+ return code;
+ }
+
+ wInfo("wal module is initialized, refId:%d", tsWal.refId);
+ return code;
+}
+
+void walCleanUp() {
+ walStopThread();
+ taosCloseRef(tsWal.refId);
+ wInfo("wal module is cleaned up");
+}
+
+void *walOpen(char *path, SWalCfg *pCfg) {
+ SWal *pWal = tcalloc(sizeof(SWal));
+ if (pWal == NULL) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return NULL;
+ }
+
+ pWal->vgId = pCfg->vgId;
+ pWal->fd = -1;
+ pWal->fileId = -1;
+ pWal->level = pCfg->walLevel;
+ pWal->keep = pCfg->keep;
+ pWal->fsyncPeriod = pCfg->fsyncPeriod;
+ tstrncpy(pWal->path, path, sizeof(pWal->path));
+ pthread_mutex_init(&pWal->mutex, NULL);
+
+ pWal->fsyncSeq = pCfg->fsyncPeriod % 1000;
+ if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
+
+ if (walInitObj(pWal) != TSDB_CODE_SUCCESS) {
+ walFreeObj(pWal);
+ return NULL;
+ }
+
+ if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) {
+ walFreeObj(pWal);
+ return NULL;
+ }
+
+ wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
+
+ return pWal;
+}
+
+int32_t walAlter(void *handle, SWalCfg *pCfg) {
+ if (handle == NULL) return TSDB_CODE_WAL_APP_ERROR;
+ SWal *pWal = handle;
+
+ if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
+ wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level,
+ pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);
+ return TSDB_CODE_SUCCESS;
+ }
+
+ wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level,
+ pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);
+
+ pWal->level = pCfg->walLevel;
+ pWal->fsyncPeriod = pCfg->fsyncPeriod;
+ pWal->fsyncSeq = pCfg->fsyncPeriod % 1000;
+ if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+void walClose(void *handle) {
+ if (handle == NULL) return;
+
+ SWal *pWal = handle;
+ pthread_mutex_lock(&pWal->mutex);
+
+ taosClose(pWal->fd);
+
+ if (!pWal->keep) {
+ int64_t fileId = -1;
+ while (walGetNextFile(pWal, &fileId) >= 0) {
+ snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
+
+ if (fileId == pWal->fileId) {
+ wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
+ } else if (remove(pWal->name) < 0) {
+ wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
+ } else {
+ wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
+ }
+ }
+ } else {
+ wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
+ }
+
+ pthread_mutex_unlock(&pWal->mutex);
+ taosRemoveRef(tsWal.refId, pWal);
+}
+
+static int32_t walInitObj(SWal *pWal) {
+ if (taosMkDir(pWal->path, 0755) != 0) {
+ wError("vgId:%d, file:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
+ }
+
+ if (pWal->keep) {
+ return TSDB_CODE_SUCCESS;
+ }
+
+ walRenew(pWal);
+
+ if (pWal && pWal->fd < 0) {
+ wError("vgId:%d, file:%s, failed to open file since %s", pWal->vgId, pWal->path, strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
+ }
+
+ wDebug("vgId:%d, file is initialized", pWal->vgId);
+ return TSDB_CODE_SUCCESS;
+}
+
+static void walFreeObj(void *wal) {
+ SWal *pWal = wal;
+ wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
+
+ taosClose(pWal->fd);
+ pthread_mutex_destroy(&pWal->mutex);
+ tfree(pWal);
+}
+
+static bool walNeedFsync(SWal *pWal) {
+ if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) {
+ return false;
+ }
+
+ if (tsWal.seq % pWal->fsyncSeq == 0) {
+ return true;
+ }
+
+ return false;
+}
+
+static void walUpdateSeq() {
+ taosMsleep(WAL_REFRESH_MS);
+ if (++tsWal.seq <= 0) {
+ tsWal.seq = 1;
+ }
+}
+
+static void walFsyncAll() {
+ SWal *pWal = taosIterateRef(tsWal.refId, NULL);
+ while (pWal) {
+ if (walNeedFsync(pWal)) {
+ wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
+ int32_t code = fsync(pWal->fd);
+ if (code != 0) {
+ wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
+ }
+ }
+ pWal = taosIterateRef(tsWal.refId, pWal);
+ }
+}
+
+static void *walThreadFunc(void *param) {
+ while (1) {
+ walUpdateSeq();
+ walFsyncAll();
+ if (tsWal.stop) break;
+ }
+
+ return NULL;
+}
+
+static int32_t walCreateThread() {
+ pthread_attr_t thAttr;
+ pthread_attr_init(&thAttr);
+ pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
+
+ if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
+ wError("failed to create wal thread since %s", strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
+ }
+
+ pthread_attr_destroy(&thAttr);
+ wDebug("wal thread is launched");
+
+ return TSDB_CODE_SUCCESS;
+}
+
+static void walStopThread() {
+ tsWal.stop = 1;
+ if (tsWal.thread) {
+ pthread_join(tsWal.thread, NULL);
+ }
+
+ wDebug("wal thread is stopped");
+}
diff --git a/src/wal/src/walUtil.c b/src/wal/src/walUtil.c
new file mode 100644
index 0000000000000000000000000000000000000000..e4d9a555b3a60cb6be1e6584652ec4a309b1c301
--- /dev/null
+++ b/src/wal/src/walUtil.c
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "os.h"
+#include "walInt.h"
+
+int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
+ int64_t curFileId = *nextFileId;
+ int64_t minFileId = INT64_MAX;
+
+ DIR *dir = opendir(pWal->path);
+ if (dir == NULL) {
+ wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
+ return -1;
+ }
+
+ struct dirent *ent;
+ while ((ent = readdir(dir)) != NULL) {
+ char *name = ent->d_name;
+
+ if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
+ int64_t id = atoll(name + WAL_PREFIX_LEN);
+ if (id <= curFileId) continue;
+
+ if (id < minFileId) {
+ minFileId = id;
+ }
+ }
+ }
+ closedir(dir);
+
+ if (minFileId == INT64_MAX) return -1;
+
+ *nextFileId = minFileId;
+ wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId);
+
+ return 0;
+}
+
+int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) {
+ int64_t minFileId = INT64_MAX;
+
+ DIR *dir = opendir(pWal->path);
+ if (dir == NULL) {
+ wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
+ return -1;
+ }
+
+ struct dirent *ent;
+ while ((ent = readdir(dir)) != NULL) {
+ char *name = ent->d_name;
+
+ if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
+ int64_t id = atoll(name + WAL_PREFIX_LEN);
+ if (id >= curFileId) continue;
+
+ minDiff--;
+ if (id < minFileId) {
+ minFileId = id;
+ }
+ }
+ }
+ closedir(dir);
+
+ if (minFileId == INT64_MAX) return -1;
+ if (minDiff > 0) return -1;
+
+ *oldFileId = minFileId;
+ wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId);
+
+ return 0;
+}
+
+int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
+ int64_t maxFileId = INT64_MIN;
+
+ DIR *dir = opendir(pWal->path);
+ if (dir == NULL) {
+ wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
+ return -1;
+ }
+
+ struct dirent *ent;
+ while ((ent = readdir(dir)) != NULL) {
+ char *name = ent->d_name;
+
+ if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
+ int64_t id = atoll(name + WAL_PREFIX_LEN);
+ if (id > maxFileId) {
+ maxFileId = id;
+ }
+ }
+ }
+ closedir(dir);
+
+ if (maxFileId == INT64_MIN) {
+ *newFileId = 0;
+ } else {
+ *newFileId = maxFileId;
+ }
+
+ wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId);
+
+ return 0;
+}
\ No newline at end of file
diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c
index 95587caa141a8964a845a9a0b47485bfc20fa998..68ea5e2b72cab68bc337ccb3aabf41fa45fc7bf1 100644
--- a/src/wal/src/walWrite.c
+++ b/src/wal/src/walWrite.c
@@ -15,238 +15,93 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "twal.h"
-#include "walInt.h"
-#include "walMgmt.h"
-#include "tchecksum.h"
-#include "tutil.h"
-#include "ttimer.h"
+#include "talloc.h"
#include "taoserror.h"
+#include "tchecksum.h"
#include "twal.h"
-#include "tqueue.h"
-
-
-typedef struct {
- uint64_t version;
- int fd;
- int keep;
- int level;
- int32_t fsyncPeriod;
- void *timer;
- void *signature;
- int max; // maximum number of wal files
- uint32_t id; // increase continuously
- int num; // number of wal files
- char path[TSDB_FILENAME_LEN];
- char name[TSDB_FILENAME_LEN+16];
- pthread_mutex_t mutex;
-} SWal;
-
-static void *walTmrCtrl = NULL;
-static int tsWalNum = 0;
-static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT;
-static int walHandleExistingFiles(const char *path);
-static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
-static int walRemoveWalFiles(const char *path);
-static void walProcessFsyncTimer(void *param, void *tmrId);
-static void walRelease(SWal *pWal);
-static int walGetMaxOldFileId(char *odir);
-
-static void walModuleInitFunc() {
- walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
- if (walTmrCtrl == NULL)
- walModuleInit = PTHREAD_ONCE_INIT;
- else
- wDebug("WAL module is initialized");
-}
-
-static inline bool walNeedFsyncTimer(SWal *pWal) {
- if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) {
- return true;
- }
- return false;
-}
-
-void *walOpen(const char *path, const SWalCfg *pCfg) {
- SWal *pWal = calloc(sizeof(SWal), 1);
- if (pWal == NULL) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- return NULL;
- }
-
- pthread_once(&walModuleInit, walModuleInitFunc);
- if (walTmrCtrl == NULL) {
- free(pWal);
- terrno = TAOS_SYSTEM_ERROR(errno);
- return NULL;
- }
-
- atomic_add_fetch_32(&tsWalNum, 1);
- pWal->fd = -1;
- pWal->max = pCfg->wals;
- pWal->id = 0;
- pWal->num = 0;
- pWal->level = pCfg->walLevel;
- pWal->keep = pCfg->keep;
- pWal->fsyncPeriod = pCfg->fsyncPeriod;
- pWal->signature = pWal;
- tstrncpy(pWal->path, path, sizeof(pWal->path));
- pthread_mutex_init(&pWal->mutex, NULL);
-
- if (walNeedFsyncTimer(pWal)) {
- pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
- if (pWal->timer == NULL) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- walRelease(pWal);
- return NULL;
- }
- }
-
- if (taosMkDir(path, 0755) != 0) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
- walRelease(pWal);
- pWal = NULL;
- }
-
- if (pCfg->keep == 1) return pWal;
-
- if (walHandleExistingFiles(path) == 0) walRenew(pWal);
-
- if (pWal && pWal->fd < 0) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- wError("wal:%s, failed to open(%s)", path, strerror(errno));
- walRelease(pWal);
- pWal = NULL;
- }
-
- if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
- return pWal;
-}
-
-int walAlter(twalh wal, const SWalCfg *pCfg) {
- SWal *pWal = wal;
- if (pWal == NULL) {
- return TSDB_CODE_WAL_APP_ERROR;
- }
-
- if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
- wDebug("wal:%s, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->name, pWal->level,
- pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);
- return TSDB_CODE_SUCCESS;
- }
-
- wInfo("wal:%s, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->name, pWal->level, pWal->fsyncPeriod,
- pCfg->walLevel, pCfg->fsyncPeriod);
-
- pthread_mutex_lock(&pWal->mutex);
- pWal->level = pCfg->walLevel;
- pWal->fsyncPeriod = pCfg->fsyncPeriod;
- if (walNeedFsyncTimer(pWal)) {
- wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
- taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer, walTmrCtrl);
- } else {
- wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
- taosTmrStop(pWal->timer);
- pWal->timer = NULL;
- }
- pthread_mutex_unlock(&pWal->mutex);
-
- return TSDB_CODE_SUCCESS;
-}
-
-void walClose(void *handle) {
- if (handle == NULL) return;
-
- SWal *pWal = handle;
- taosClose(pWal->fd);
- if (pWal->timer) taosTmrStopA(&pWal->timer);
-
- if (pWal->keep == 0) {
- // remove all files in the directory
- for (int i = 0; i < pWal->num; ++i) {
- snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i);
- if (remove(pWal->name) < 0) {
- wError("wal:%s, failed to remove", pWal->name);
- } else {
- wDebug("wal:%s, it is removed", pWal->name);
- }
- }
- } else {
- wDebug("wal:%s, it is closed and kept", pWal->name);
- }
+#include "walInt.h"
- walRelease(pWal);
-}
+static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name);
-int walRenew(void *handle) {
+int32_t walRenew(void *handle) {
if (handle == NULL) return 0;
- SWal *pWal = handle;
- terrno = 0;
+ SWal * pWal = handle;
+ int32_t code = 0;
pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >= 0) {
close(pWal->fd);
- pWal->id++;
- wDebug("wal:%s, it is closed", pWal->name);
+ wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
}
- pWal->num++;
+ if (pWal->keep) {
+ pWal->fileId = 0;
+ } else {
+ if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;
+ pWal->fileId++;
+ }
- snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id);
+ snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) {
- wError("wal:%s, failed to open(%s)", pWal->name, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
+ code = TAOS_SYSTEM_ERROR(errno);
+ wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
} else {
- wDebug("wal:%s, it is created", pWal->name);
-
- if (pWal->num > pWal->max) {
- // remove the oldest wal file
- char name[TSDB_FILENAME_LEN * 3];
- snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
- if (remove(name) < 0) {
- wError("wal:%s, failed to remove(%s)", name, strerror(errno));
+ wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
+ }
+
+ if (!pWal->keep) {
+ // remove the oldest wal file
+ int64_t oldFileId = -1;
+ if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
+ char walName[WAL_FILE_LEN] = {0};
+ snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
+
+ if (remove(walName) < 0) {
+ wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
} else {
- wDebug("wal:%s, it is removed", name);
+ wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
}
-
- pWal->num--;
}
}
pthread_mutex_unlock(&pWal->mutex);
- return terrno;
+ return code;
}
-int walWrite(void *handle, SWalHead *pHead) {
- SWal *pWal = handle;
- if (pWal == NULL) return -1;
+int32_t walWrite(void *handle, SWalHead *pHead) {
+ if (handle == NULL) return -1;
- terrno = 0;
+ SWal * pWal = handle;
+ int32_t code = 0;
// no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0;
- pHead->signature = walSignature;
+ pHead->signature = WAL_SIGNATURE;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
- int contLen = pHead->len + sizeof(SWalHead);
+ int32_t contLen = pHead->len + sizeof(SWalHead);
+
+ pthread_mutex_lock(&pWal->mutex);
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
- wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- return terrno;
+ code = TAOS_SYSTEM_ERROR(errno);
+ wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else {
pWal->version = pHead->version;
+ wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId);
}
+
+ pthread_mutex_unlock(&pWal->mutex);
+
ASSERT(contLen == pHead->len + sizeof(SWalHead));
- return 0;
+ return code;
}
void walFsync(void *handle) {
@@ -255,167 +110,125 @@ void walFsync(void *handle) {
if (pWal->fsyncPeriod == 0) {
if (fsync(pWal->fd) < 0) {
- wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
+ wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
}
}
}
-int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
- SWal *pWal = handle;
- struct dirent *ent;
- int count = 0;
- uint32_t maxId = 0, minId = -1, index =0;
+int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *, int32_t)) {
+ if (handle == NULL) return -1;
- terrno = 0;
- int plen = strlen(walPrefix);
- char opath[TSDB_FILENAME_LEN + 5];
+ SWal * pWal = handle;
+ int32_t count = 0;
+ int32_t code = 0;
+ int64_t fileId = -1;
- int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
- if (pWal->keep == 0) strcpy(opath + slen, "/old");
+ while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
+ if (fileId == pWal->fileId) continue;
- DIR *dir = opendir(opath);
- if (dir == NULL && errno == ENOENT) return 0;
- if (dir == NULL) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- return terrno;
- }
+ char walName[WAL_FILE_LEN];
+ snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
- while ((ent = readdir(dir)) != NULL) {
- if (strncmp(ent->d_name, walPrefix, plen) == 0) {
- index = atol(ent->d_name + plen);
- if (index > maxId) maxId = index;
- if (index < minId) minId = index;
- count++;
+ wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
+ int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName);
+ if (code != TSDB_CODE_SUCCESS) {
+ wDebug("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
+ continue;
}
- }
- closedir(dir);
+ if (!pWal->keep) {
+ wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName);
+ remove(walName);
+ } else {
+ wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
+ }
- if (count == 0) {
- if (pWal->keep) terrno = walRenew(pWal);
- return terrno;
+ count++;
}
- if (count != (maxId - minId + 1)) {
- wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
- terrno = TSDB_CODE_WAL_APP_ERROR;
- } else {
- wDebug("wal:%s, %d files will be restored", opath, count);
+ if (!pWal->keep) return TSDB_CODE_SUCCESS;
- for (index = minId; index <= maxId; ++index) {
- snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
- terrno = walRestoreWalFile(pWal, pVnode, writeFp);
- if (terrno < 0) continue;
- }
- }
-
- if (terrno == 0) {
- if (pWal->keep == 0) {
- terrno = walRemoveWalFiles(opath);
- if (terrno == 0) {
- if (remove(opath) < 0) {
- wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- }
- }
- } else {
- // open the existing WAL file in append mode
- pWal->num = count;
- pWal->id = maxId;
- snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId);
- pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
- if (pWal->fd < 0) {
- wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- }
+ if (count == 0) {
+ wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name);
+ return walRenew(pWal);
+ } else {
+ // open the existing WAL file in append mode
+ pWal->fileId = 0;
+ snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
+ pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
+ if (pWal->fd < 0) {
+ wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
}
+ wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name);
}
- return terrno;
+ return TSDB_CODE_SUCCESS;
}
-int walGetWalFile(void *handle, char *name, uint32_t *index) {
- SWal * pWal = handle;
- int code = 1;
- int32_t first = 0;
+int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
+ if (handle == NULL) return -1;
+ SWal *pWal = handle;
- name[0] = 0;
- if (pWal == NULL || pWal->num == 0) return 0;
+ // for keep
+ if (*fileId == 0) *fileId = -1;
pthread_mutex_lock(&(pWal->mutex));
- first = pWal->id + 1 - pWal->num;
- if (*index == 0) *index = first; // set to first one
-
- if (*index < first && *index > pWal->id) {
- code = -1; // index out of range
- } else {
- sprintf(name, "wal/%s%d", walPrefix, *index);
- code = (*index == pWal->id) ? 0 : 1;
+ int32_t code = walGetNextFile(pWal, fileId);
+ if (code >= 0) {
+ sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
+ code = (*fileId == pWal->fileId) ? 0 : 1;
}
+ wTrace("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->fileId, *fileId);
pthread_mutex_unlock(&(pWal->mutex));
return code;
}
-static void walRelease(SWal *pWal) {
- pthread_mutex_destroy(&pWal->mutex);
- pWal->signature = NULL;
- free(pWal);
-
- if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) {
- if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl);
- walTmrCtrl = NULL;
- walModuleInit = PTHREAD_ONCE_INIT;
- wDebug("WAL module is cleaned up");
- }
-}
-
-static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
- char *name = pWal->name;
- int size = 1024 * 1024; // default 1M buffer size
-
- terrno = 0;
- char *buffer = malloc(size);
+static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) {
+ int32_t size = WAL_MAX_SIZE;
+ void * buffer = tmalloc(size);
if (buffer == NULL) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- return terrno;
+ wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
+ return TAOS_SYSTEM_ERROR(errno);
}
- SWalHead *pHead = (SWalHead *)buffer;
-
- int fd = open(name, O_RDWR);
+ int32_t fd = open(name, O_RDWR);
if (fd < 0) {
- wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- free(buffer);
- return terrno;
+ wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
+ tfree(buffer);
+ return TAOS_SYSTEM_ERROR(errno);
}
- wDebug("wal:%s, start to restore", name);
+ wDebug("vgId:%d, file:%s, start to restore", pWal->vgId, name);
+
+ int32_t code = TSDB_CODE_SUCCESS;
+ size_t offset = 0;
+ SWalHead *pHead = buffer;
- size_t offset = 0;
while (1) {
- int ret = taosTRead(fd, pHead, sizeof(SWalHead));
+ int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead));
if (ret == 0) break;
if (ret < 0) {
- wError("wal:%s, failed to read wal head part since %s", name, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
+ wError("vgId:%d, file:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno));
+ code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < sizeof(SWalHead)) {
- wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret);
+ wError("vgId:%d, file:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId,
+ name, strerror(errno), ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
- wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
- terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
+ wError("vgId:%d, file:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name);
+ code = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false);
break;
}
@@ -424,22 +237,24 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
size = sizeof(SWalHead) + pHead->len;
buffer = realloc(buffer, size);
if (buffer == NULL) {
- terrno = TAOS_SYSTEM_ERROR(errno);
+ wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
+ code = TAOS_SYSTEM_ERROR(errno);
break;
}
- pHead = (SWalHead *)buffer;
+ pHead = buffer;
}
ret = taosTRead(fd, pHead->cont, pHead->len);
if (ret < 0) {
- wError("wal:%s failed to read wal body part since %s", name, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
+ wError("vgId:%d, file:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno));
+ code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
- wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret);
+ wError("vgId:%d, file:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId,
+ name, strerror(errno), ret, pHead->len);
taosFtruncate(fd, offset);
fsync(fd);
break;
@@ -448,102 +263,16 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
offset = offset + sizeof(SWalHead) + pHead->len;
if (pWal->keep) pWal->version = pHead->version;
- (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
- }
-
- close(fd);
- free(buffer);
-
- return terrno;
-}
-
-int walHandleExistingFiles(const char *path) {
- char oname[TSDB_FILENAME_LEN * 3];
- char nname[TSDB_FILENAME_LEN * 3];
- char opath[TSDB_FILENAME_LEN];
-
- snprintf(opath, sizeof(opath), "%s/old", path);
-
- struct dirent *ent;
- DIR *dir = opendir(path);
- int plen = strlen(walPrefix);
- terrno = 0;
-
- int midx = walGetMaxOldFileId(opath);
- int count = 0;
- while ((ent = readdir(dir)) != NULL) {
- if (strncmp(ent->d_name, walPrefix, plen) == 0) {
- midx++;
- snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
- snprintf(nname, sizeof(nname), "%s/old/wal%d", path, midx);
- if (taosMkDir(opath, 0755) != 0) {
- wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- break;
- }
-
- if (rename(oname, nname) < 0) {
- wError("wal:%s, failed to move to new:%s", oname, nname);
- terrno = TAOS_SYSTEM_ERROR(errno);
- break;
- }
-
- count++;
- }
-
- wDebug("wal:%s, %d files are moved for restoration", path, count);
- }
-
- closedir(dir);
- return terrno;
-}
-
-static int walRemoveWalFiles(const char *path) {
- int plen = strlen(walPrefix);
- char name[TSDB_FILENAME_LEN * 3];
- terrno = 0;
+ wTrace("vgId:%d, restore version:%" PRIu64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId);
- struct dirent *ent;
- DIR *dir = opendir(path);
- if (dir == NULL && errno == ENOENT) return 0;
- if (dir == NULL) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- return terrno;
- }
-
- while ((ent = readdir(dir)) != NULL) {
- if (strncmp(ent->d_name, walPrefix, plen) == 0) {
- snprintf(name, sizeof(name), "%s/%s", path, ent->d_name);
- if (remove(name) < 0) {
- wError("wal:%s, failed to remove(%s)", name, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- }
- }
+ (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
}
- closedir(dir);
-
- return terrno;
-}
-
-static void walProcessFsyncTimer(void *param, void *tmrId) {
- SWal *pWal = param;
-
- if (pWal->signature != pWal) return;
- if (pWal->fd < 0) return;
-
- if (fsync(pWal->fd) < 0) {
- wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
- }
+ close(fd);
+ tfree(buffer);
- if (walNeedFsyncTimer(pWal)) {
- pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
- } else {
- wInfo("wal:%s, stop fsync timer for walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
- taosTmrStop(pWal->timer);
- pWal->timer = NULL;
- }
+ return code;
}
int64_t walGetVersion(twalh param) {
@@ -552,29 +281,3 @@ int64_t walGetVersion(twalh param) {
return pWal->version;
}
-
-static int walGetMaxOldFileId(char *odir) {
- int midx = 0;
- DIR * dir = NULL;
- struct dirent *dp = NULL;
- int plen = strlen(walPrefix);
-
- if (access(odir, F_OK) != 0) return midx;
-
- dir = opendir(odir);
- if (dir == NULL) {
- wError("failed to open directory %s since %s", odir, strerror(errno));
- terrno = TAOS_SYSTEM_ERROR(errno);
- return -1;
- }
-
- while ((dp = readdir(dir)) != NULL) {
- if (strncmp(dp->d_name, walPrefix, plen) == 0) {
- int idx = atol(dp->d_name + plen);
- if (midx < idx) midx = idx;
- }
- }
-
- closedir(dir);
- return midx;
-}
\ No newline at end of file
diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c
index bbee1347b8f92aa6cfad448fdfb369de8f5a6301..186f2ef5ffe9fed68f9c9205b251cbb3e10dfeab 100644
--- a/src/wal/test/waltest.c
+++ b/src/wal/test/waltest.c
@@ -115,17 +115,17 @@ int main(int argc, char *argv[]) {
printf("%d wal files are written\n", total);
- uint32_t index = 0;
- char name[256];
+ int64_t index = 0;
+ char name[256];
while (1) {
int code = walGetWalFile(pWal, name, &index);
if (code == -1) {
- printf("failed to get wal file, index:%d\n", index);
+ printf("failed to get wal file, index:%" PRId64 "\n", index);
break;
}
- printf("index:%d wal:%s\n", index, name);
+ printf("index:%" PRId64 " wal:%s\n", index, name);
if (code == 0) break;
index++;
diff --git a/tests/script/unique/dnode/offline1.sim b/tests/script/unique/dnode/offline1.sim
index 02d03dee97be0f2de62b3f8eb18194c595e7b050..beebbfda60c03d24a774347c8ecf8c2f9a4c6c9e 100644
--- a/tests/script/unique/dnode/offline1.sim
+++ b/tests/script/unique/dnode/offline1.sim
@@ -49,7 +49,7 @@ print dnode1 $data4_2
if $data4_1 != ready then
return -1
endi
-if $data4_2 != offline then
+if $data4_2 == ready then
return -1
endi