From 6b02bd2630339024f009e2d479c0396ecd8b01ed Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Mon, 30 Mar 2020 08:53:44 +0800 Subject: [PATCH] add API walGetWalFile for synchronization --- src/vnode/wal/inc/twal.h | 5 +- src/vnode/wal/src/walMain.c | 97 +++++++++++++++++++++++++----------- src/vnode/wal/test/waltest.c | 18 +++++++ 3 files changed, 90 insertions(+), 30 deletions(-) diff --git a/src/vnode/wal/inc/twal.h b/src/vnode/wal/inc/twal.h index 34491c993d..fffbb45db2 100644 --- a/src/vnode/wal/inc/twal.h +++ b/src/vnode/wal/inc/twal.h @@ -24,12 +24,12 @@ extern "C" { #define TAOS_WAL_FSYNC 2 typedef struct { - uint32_t signature; - uint32_t cksum; int8_t msgType; int8_t reserved[3]; int32_t len; uint64_t version; + uint32_t signature; + uint32_t cksum; char cont[]; } SWalHead; @@ -41,6 +41,7 @@ int walRenew(twal_h); int walWrite(twal_h, SWalHead *); void walFsync(twal_h); int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); +int walGetWalFile(twal_h, char *name, int32_t *index); extern int wDebugFlag; diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index b3d14eb179..6751940d1b 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -23,6 +23,7 @@ #include "os.h" #include "tlog.h" +#include "tchecksum.h" #include "tutil.h" #include "twal.h" @@ -36,10 +37,11 @@ typedef struct { int fd; int level; int max; // maximum number of wal files - uint32_t id; // increase continuously + uint64_t id; // increase continuously int num; // number of wal files char path[TSDB_FILENAME_LEN]; char name[TSDB_FILENAME_LEN]; + pthread_mutex_t mutex; } SWal; int wDebugFlag = 135; @@ -59,6 +61,7 @@ void *walOpen(char *path, int max, int level) { pWal->num = 0; pWal->level = level; strcpy(pWal->path, path); + pthread_mutex_init(&pWal->mutex, NULL); if (access(path, F_OK) != 0) mkdir(path, 0755); @@ -67,9 +70,10 @@ void *walOpen(char *path, int max, int level) { if (pWal->fd <0) { wError("wal:%s, failed to open", path); + pthread_mutex_destroy(&pWal->mutex); free(pWal); pWal = NULL; - } + } return pWal; } @@ -82,48 +86,59 @@ void walClose(void *handle) { // remove all files in the directory for (int i=0; inum; ++i) { - sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id-i); + sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id-i); if (remove(pWal->name) <0) { wError("wal:%s, failed to remove", pWal->name); } else { wTrace("wal:%s, it is removed", pWal->name); } } + + pthread_mutex_destroy(&pWal->mutex); + + free(pWal); } int walRenew(twal_h handle) { SWal *pWal = (SWal *)handle; + int code = 0; + pthread_mutex_lock(&pWal->mutex); + if (pWal->fd >=0) { close(pWal->fd); pWal->id++; wTrace("wal:%s, it is closed", pWal->name); } - sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id); + pWal->num++; + + sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno)); - return -1; - } + code = -1; + } else { + wTrace("wal:%s, it is created", pWal->name); - wTrace("wal:%s, it is open", pWal->name); + if (pWal->num > pWal->max) { + // remove the oldest wal file + char name[TSDB_FILENAME_LEN]; + sprintf(name, "%s/%s%ld", pWal->path, walPrefix, pWal->id - pWal->max); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + } else { + wTrace("wal:%s, it is removed", name); + } - pWal->num++; - if (pWal->num > pWal->max) { - // remove the oldest wal file - char name[TSDB_FILENAME_LEN]; - sprintf(name, "%s/%s%010d", pWal->path, walPrefix, pWal->id - pWal->max); - if (remove(name) <0) { - wError("wal:%s, failed to remove(%s)", name, strerror(errno)); - } else { - wTrace("wal:%s, it is removed", name); + pWal->num--; } - - pWal->num--; } - return 0; + pthread_mutex_unlock(&pWal->mutex); + + return code; } int walWrite(void *handle, SWalHead *pHead) { @@ -134,6 +149,7 @@ int walWrite(void *handle, SWalHead *pHead) { if (pWal->level == TAOS_WAL_NOLOG) return 0; pHead->signature = walSignature; + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal)); int contLen = pHead->len + sizeof(SWalHead); if(write(pWal->fd, pHead, contLen) != contLen) { @@ -157,7 +173,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { int code = 0; struct dirent *ent; int count = 0; - uint32_t maxId = 0, minId = -1, index =0; + uint64_t maxId = 0, minId = -1, index =0; int plen = strlen(walPrefix); char opath[TSDB_FILENAME_LEN]; @@ -169,7 +185,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { DIR *dir = opendir(opath); while ((ent = readdir(dir))!= NULL) { if ( strncmp(ent->d_name, walPrefix, plen) == 0) { - index = atol(ent->d_name + plen); + index = atoll(ent->d_name + plen); if (index > maxId) maxId = index; if (index < minId) minId = index; count++; @@ -183,7 +199,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { wTrace("wal:%s, %d files will be restored", opath, count); for (index = minId; index<=maxId; ++index) { - sprintf(pWal->name, "%s/old/%s%010d", pWal->path, walPrefix, index); + sprintf(pWal->name, "%s/old/%s%ld", pWal->path, walPrefix, index); code = walRestoreWalFile(pWal->name, pVnode, writeFp); if (code < 0) break; } @@ -204,9 +220,34 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { return code; } +int walGetWalFile(void *handle, char *name, int32_t *index) { + SWal *pWal = (SWal *)handle; + int code = 1; + int32_t first = 0; + + name[0] = 0; + if (pWal == NULL || pWal->num == 0) return 0; + + pthread_mutex_lock(&(pWal->mutex)); + + first = pWal->id + 1 - pWal->num; + if (*index == 0) *index = first; // set to first one + + if (*index < first && *index > pWal->id) { + code = -1; // index out of range + } else { + sprintf(name, "%s/%s%ld", pWal->path, walPrefix, *index); + code = (*index == pWal->id) ? 0:1; + } + + pthread_mutex_unlock(&(pWal->mutex)); + + return code; +} + static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { SWalHead walHead; - int code = -1; + int code = 0; int fd = open(name, O_RDONLY); if (fd < 0) { @@ -221,21 +262,21 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, vo if ( ret == 0) { code = 0; break;} if (ret != sizeof(walHead)) { - wError("wal:%s, failed to read(%s)", name, strerror(errno)); + wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno)); break; } - if (walHead.signature != walSignature) { - wError("wal:%s, file is messed up, signature:", name, walHead.signature); + if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) { + wWarn("wal:%s, cksum is messed up, skip the rest of file", name); break; - } + } char *buffer = malloc(sizeof(SWalHead) + walHead.len); memcpy(buffer, &walHead, sizeof(walHead)); ret = read(fd, buffer+sizeof(walHead), walHead.len); if ( ret != walHead.len) { - wError("wal:%s, failed to read(%s)", name, strerror(errno)); + wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret); break; } diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c index cdeed6b3d4..768c9d54d7 100644 --- a/src/vnode/wal/test/waltest.c +++ b/src/vnode/wal/test/waltest.c @@ -96,6 +96,7 @@ int main(int argc, char *argv[]) { for (int i=0; iversion = ++ver; + pHead->len = size; walWrite(pWal, pHead); } @@ -104,6 +105,23 @@ int main(int argc, char *argv[]) { } printf("%d wal files are written\n", total); + + int32_t index = 0; + char name[256]; + + while (1) { + int code = walGetWalFile(pWal, name, &index); + if (code == -1) { + printf("failed to get wal file, index:%d\n", index); + break; + } + + printf("index:%d wal:%s\n", index, name); + if (code == 0) break; + + index++; + } + getchar(); walClose(pWal); -- GitLab