diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index 9a9c658f1be4b3e403566af3ed7036f89b695a70..7ec55bbf1f1e44cf319d60e75f5da7753ef762d3 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -35,6 +35,7 @@ #define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);} typedef struct { + uint64_t version; int fd; int keep; int level; @@ -50,9 +51,8 @@ int wDebugFlag = 135; static uint32_t walSignature = 0xFAFBFDFE; static int walHandleExistingFiles(const char *path); -static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp); +static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp); static int walRemoveWalFiles(const char *path); -static int walMoveOldWalFilesBack(const char *path); void *walOpen(const char *path, const SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); @@ -69,6 +69,8 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { if (access(path, F_OK) != 0) mkdir(path, 0755); + if (pCfg->keep == 1) return pWal; + if (walHandleExistingFiles(path) == 0) walRenew(pWal); @@ -155,6 +157,7 @@ int walWrite(void *handle, SWalHead *pHead) { // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; + if (pHead->version <= pWal->version) return 0; pHead->signature = walSignature; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); @@ -163,7 +166,9 @@ int walWrite(void *handle, SWalHead *pHead) { if(write(pWal->fd, pHead, contLen) != contLen) { wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); code = -1; - } + } else { + pWal->version = pHead->version; + } return code; } @@ -185,7 +190,10 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) int plen = strlen(walPrefix); char opath[TSDB_FILENAME_LEN+5]; - sprintf(opath, "%s/old", pWal->path); + + int slen = sprintf(opath, "%s", pWal->path); + if ( pWal->keep == 0) + strcpy(opath+slen, "/old"); // is there old directory? if (access(opath, F_OK)) return 0; @@ -200,6 +208,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } } + if (count == 0) return 0; + if ( count != (maxId-minId+1) ) { wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); code = -1; @@ -207,21 +217,29 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) wTrace("wal:%s, %d files will be restored", opath, count); for (index = minId; index<=maxId; ++index) { - sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index); - code = walRestoreWalFile(pWal->name, pVnode, writeFp); + sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index); + code = walRestoreWalFile(pWal, pVnode, writeFp); if (code < 0) break; } } if (code == 0) { - if (pWal->keep) { - code = walMoveOldWalFilesBack(pWal->path); - } else { + if (pWal->keep == 0) { code = walRemoveWalFiles(opath); - } - if (code == 0) { - if (remove(opath) < 0) { - wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + if (code == 0) { + if (remove(opath) < 0) { + wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + code = -1; + } + } + } else { + // open the existing WAL file in append mode + pWal->num = count; + pWal->id = maxId; + sprintf(pWal->name, "%s/%s%d", opath, walPrefix, maxId); + pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { + wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno)); code = -1; } } @@ -257,8 +275,9 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) { - int code = 0; +static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { + int code = 0; + char *name = pWal->name; char *buffer = malloc(1024000); // size for one record if (buffer == NULL) return -1; @@ -294,10 +313,11 @@ static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) break; } - // write into queue + if (pWal->keep) pWal->version = pHead->version; (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); } + close(fd); free(buffer); return code; @@ -370,40 +390,3 @@ static int walRemoveWalFiles(const char *path) { return code; } -int walMoveOldWalFilesBack(const char *path) { - char oname[TSDB_FILENAME_LEN * 3]; - char nname[TSDB_FILENAME_LEN * 3]; - char opath[TSDB_FILENAME_LEN]; - struct dirent *ent; - int plen = strlen(walPrefix); - int code = 0; - - sprintf(opath, "%s/old", path); - - if (access(opath, F_OK) == 0) { - // move all old files to wal directory - int count = 0; - - DIR *dir = opendir(opath); - while ((ent = readdir(dir))!= NULL) { - if ( strncmp(ent->d_name, walPrefix, plen) == 0) { - sprintf(oname, "%s/%s", opath, ent->d_name); - sprintf(nname, "%s/%s", path, ent->d_name); - if (rename(oname, nname) < 0) { - wError("wal:%s, failed to move to new:%s", oname, nname); - code = -1; - break; - } - - count++; - } - } - - wTrace("wal:%s, %d old files are move back for keep option is set", path, count); - closedir(dir); - } - - return code; -} - - diff --git a/src/wal/test/waltest.c b/src/wal/test/waltest.c index 8e10bc11e54ba5e8e39c8116d5a1f5c86206a362..53f3477c7bf786f1399e503c24981b4c32a34618 100644 --- a/src/wal/test/waltest.c +++ b/src/wal/test/waltest.c @@ -29,8 +29,6 @@ int writeToQueue(void *pVnode, void *data, int type) { ver = pHead->version; walWrite(pWal, pHead); - - free(pHead); return 0; } @@ -42,6 +40,7 @@ int main(int argc, char *argv[]) { int total = 5; int rows = 10000; int size = 128; + int keep = 0; for (int i=1; i