diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7308027817dd2a2d05ca747496f0f7ccc6687b9d..2d5ec98a807900adf36e17e8f299d85a1c0101bc 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -303,8 +303,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } sprintf(temp, "%s/wal", rootDir); - pVnode->wal = walOpen(temp, &pVnode->walCfg); pVnode->walCfg.vgId = pVnode->vgId; + pVnode->wal = walOpen(temp, &pVnode->walCfg); if (pVnode->wal == NULL) { vnodeCleanUp(pVnode); return terrno; diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 3c34c68f83d77c771c777ce5ddf188081ff8c8b9..8b5784d1cee1e285d2361fe25260e38cdf4044bd 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -32,7 +32,7 @@ extern int32_t wDebugFlag; #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 4 +#define WAL_PREFIX_LEN 3 #define WAL_REFRESH_MS 1000 #define WAL_MAX_SIZE (1024 * 1024) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index b1e32496cc8254f4d4496a3e1bed6d3facd1a620..72c0ef21a207b52029a489263b7c50ffe61bc187 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -43,7 +43,7 @@ int32_t walInit() { int32_t code = walCreateThread(); if (code != TSDB_CODE_SUCCESS) { - wError("failed to init wal module, reason:%s", tstrerror(code)); + wError("failed to init wal module since %s", tstrerror(code)); return code; } @@ -87,8 +87,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { } atomic_add_fetch_32(&tsWal.num, 1); - wDebug("vgId:%d, wal:%p is opened, level:%d period:%d path:%s", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod, - pWal->path); + wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); return pWal; } @@ -120,8 +119,8 @@ void walClose(void *handle) { SWal *pWal = handle; taosClose(pWal->fd); - if (pWal->keep == 0) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); + if (!pWal->keep) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); if (remove(pWal->name) < 0) { wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); } else { @@ -137,11 +136,11 @@ void walClose(void *handle) { static int32_t walInitObj(SWal *pWal) { if (taosMkDir(pWal->path, 0755) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to create directory, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, file:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return terrno; } - if (pWal->keep == 1) { + if (pWal->keep) { return TSDB_CODE_SUCCESS; } @@ -149,11 +148,11 @@ static int32_t walInitObj(SWal *pWal) { if (pWal && pWal->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, file:%s, failed to open file since %s", pWal->vgId, pWal->path, strerror(errno)); return terrno; } - wDebug("vgId:%d, wal:%s, is initialized", pWal->vgId, pWal->name); + wDebug("vgId:%d, file is initialized", pWal->vgId); return TSDB_CODE_SUCCESS; } @@ -192,7 +191,7 @@ static void walFsyncAll() { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); int32_t code = fsync(pWal->fd); if (code != 0) { - wError("vgId:%d, wal:%s, fsync failed, reason:%s", pWal->vgId, pWal->name, strerror(code)); + wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(code)); } } pWal = taosIterateRef(tsWal.refId, pWal); @@ -215,7 +214,7 @@ static int32_t walCreateThread() { pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { - wError("failed to create wal thread, reason:%s", strerror(errno)); + wError("failed to create wal thread since %s", strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 6c3c97a0e9b38c2f78f776e3b7d793dda03ef921..45f00def06a0846ad21987fb361372db4fe7c1d7 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -36,34 +36,34 @@ int32_t walRenew(void *handle) { if (pWal->fd >= 0) { close(pWal->fd); - wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } - uint64_t lastId = pWal->fileId; + int64_t lastId = pWal->fileId; if (pWal->keep) { pWal->fileId = 0; } else { pWal->fileId = taosGetTimestampUs(); } - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); } else { - wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name); } - if (pWal->keep != 1 && lastId != -1) { + if (!pWal->keep && lastId != -1) { // remove last wal file char name[TSDB_FILENAME_LEN + 20]; - snprintf(name, sizeof(name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, lastId); + snprintf(name, sizeof(name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, lastId); if (remove(name) < 0) { - wError("vgId:%d, wal:%s, failed to remove since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, name, strerror(errno)); } else { - wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); + wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, name); } } @@ -88,9 +88,10 @@ int32_t walWrite(void *handle, SWalHead *pHead) { if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, wal:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); } else { pWal->version = pHead->version; + wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId); } ASSERT(contLen == pHead->len + sizeof(SWalHead)); @@ -104,7 +105,7 @@ void walFsync(void *handle) { if (pWal->fsyncPeriod == 0) { if (fsync(pWal->fd) < 0) { - wError("vgId:%d, wal:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); } } } @@ -124,16 +125,23 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * char *fileName = ent->d_name; if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); if (fileId == pWal->fileId) continue; - wDebug("vgId:%d, wal:%s, will be restored", pWal->vgId, fileName); + char walName[WAL_FILE_LEN]; + snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, fileName); + wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName); + int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName); if (code != TSDB_CODE_SUCCESS) continue; - wInfo("vgId:%d, wal:%s, restore success, remove this file", pWal->vgId, fileName); - remove(fileName); + + if (!pWal->keep) { + wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName); + remove(walName); + } else { + wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName); + } count++; } @@ -142,6 +150,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * if (pWal->keep) { if (count == 0) { + wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name); return walRenew(pWal); } else { // open the existing WAL file in append mode @@ -149,9 +158,10 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { - wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } + wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name); } } @@ -176,18 +186,18 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch int32_t size = WAL_MAX_SIZE; void * buffer = tmalloc(size); if (buffer == NULL) { - wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } int32_t fd = open(name, O_RDWR); if (fd < 0) { - wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); tfree(buffer); return TAOS_SYSTEM_ERROR(errno); } - wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); + wDebug("vgId:%d, file:%s, start to restore", pWal->vgId, name); int32_t code = TSDB_CODE_SUCCESS; size_t offset = 0; @@ -198,13 +208,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch if (ret == 0) break; if (ret < 0) { - wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < sizeof(SWalHead)) { - wError("vgId:%d, wal:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId, + wError("vgId:%d, file:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId, name, strerror(errno), ret); taosFtruncate(fd, offset); fsync(fd); @@ -212,7 +222,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wError("vgId:%d, wal:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name); + wError("vgId:%d, file:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name); code = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(false); break; @@ -222,7 +232,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch size = sizeof(SWalHead) + pHead->len; buffer = realloc(buffer, size); if (buffer == NULL) { - wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); break; } @@ -232,13 +242,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ret = taosTRead(fd, pHead->cont, pHead->len); if (ret < 0) { - wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); + wError("vgId:%d, file:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); break; } if (ret < pHead->len) { - wError("vgId:%d, wal:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId, + wError("vgId:%d, file:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId, name, strerror(errno), ret, pHead->len); taosFtruncate(fd, offset); fsync(fd); @@ -248,6 +258,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch offset = offset + sizeof(SWalHead) + pHead->len; if (pWal->keep) pWal->version = pHead->version; + + wTrace("vgId:%d, restore version:%" PRIu64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId); + (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); } @@ -279,7 +292,7 @@ static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId char *fileName = ent->d_name; if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); + int64_t fileId = atoll(fileName + WAL_PREFIX_LEN); if (fileId <= lastFileId) continue; if (fileId < nearFileId) {