提交 8e1ddeea 编写于 作者: S Shengliang Guan

TD-1846

上级 d4fbd7bb
...@@ -303,8 +303,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -303,8 +303,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
sprintf(temp, "%s/wal", rootDir); sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, &pVnode->walCfg);
pVnode->walCfg.vgId = pVnode->vgId; pVnode->walCfg.vgId = pVnode->vgId;
pVnode->wal = walOpen(temp, &pVnode->walCfg);
if (pVnode->wal == NULL) { if (pVnode->wal == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
......
...@@ -32,7 +32,7 @@ extern int32_t wDebugFlag; ...@@ -32,7 +32,7 @@ extern int32_t wDebugFlag;
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define WAL_PREFIX "wal" #define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 4 #define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (1024 * 1024) #define WAL_MAX_SIZE (1024 * 1024)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
......
...@@ -43,7 +43,7 @@ int32_t walInit() { ...@@ -43,7 +43,7 @@ int32_t walInit() {
int32_t code = walCreateThread(); int32_t code = walCreateThread();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
wError("failed to init wal module, reason:%s", tstrerror(code)); wError("failed to init wal module since %s", tstrerror(code));
return code; return code;
} }
...@@ -87,8 +87,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { ...@@ -87,8 +87,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
} }
atomic_add_fetch_32(&tsWal.num, 1); atomic_add_fetch_32(&tsWal.num, 1);
wDebug("vgId:%d, wal:%p is opened, level:%d period:%d path:%s", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod, wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
pWal->path);
return pWal; return pWal;
} }
...@@ -120,8 +119,8 @@ void walClose(void *handle) { ...@@ -120,8 +119,8 @@ void walClose(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
taosClose(pWal->fd); taosClose(pWal->fd);
if (pWal->keep == 0) { if (!pWal->keep) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
if (remove(pWal->name) < 0) { if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else { } else {
...@@ -137,11 +136,11 @@ void walClose(void *handle) { ...@@ -137,11 +136,11 @@ void walClose(void *handle) {
static int32_t walInitObj(SWal *pWal) { static int32_t walInitObj(SWal *pWal) {
if (taosMkDir(pWal->path, 0755) != 0) { if (taosMkDir(pWal->path, 0755) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal:%s, failed to create directory, reason:%s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, file:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
return terrno; return terrno;
} }
if (pWal->keep == 1) { if (pWal->keep) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -149,11 +148,11 @@ static int32_t walInitObj(SWal *pWal) { ...@@ -149,11 +148,11 @@ static int32_t walInitObj(SWal *pWal) {
if (pWal && pWal->fd < 0) { if (pWal && pWal->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, file:%s, failed to open file since %s", pWal->vgId, pWal->path, strerror(errno));
return terrno; return terrno;
} }
wDebug("vgId:%d, wal:%s, is initialized", pWal->vgId, pWal->name); wDebug("vgId:%d, file is initialized", pWal->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -192,7 +191,7 @@ static void walFsyncAll() { ...@@ -192,7 +191,7 @@ static void walFsyncAll() {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
int32_t code = fsync(pWal->fd); int32_t code = fsync(pWal->fd);
if (code != 0) { if (code != 0) {
wError("vgId:%d, wal:%s, fsync failed, reason:%s", pWal->vgId, pWal->name, strerror(code)); wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(code));
} }
} }
pWal = taosIterateRef(tsWal.refId, pWal); pWal = taosIterateRef(tsWal.refId, pWal);
...@@ -215,7 +214,7 @@ static int32_t walCreateThread() { ...@@ -215,7 +214,7 @@ static int32_t walCreateThread() {
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
wError("failed to create wal thread, reason:%s", strerror(errno)); wError("failed to create wal thread since %s", strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
......
...@@ -36,34 +36,34 @@ int32_t walRenew(void *handle) { ...@@ -36,34 +36,34 @@ int32_t walRenew(void *handle) {
if (pWal->fd >= 0) { if (pWal->fd >= 0) {
close(pWal->fd); close(pWal->fd);
wDebug("vgId:%d, wal:%s, it is closed", pWal->vgId, pWal->name); wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
} }
uint64_t lastId = pWal->fileId; int64_t lastId = pWal->fileId;
if (pWal->keep) { if (pWal->keep) {
pWal->fileId = 0; pWal->fileId = 0;
} else { } else {
pWal->fileId = taosGetTimestampUs(); pWal->fileId = taosGetTimestampUs();
} }
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, pWal->fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (pWal->fd < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
wDebug("vgId:%d, wal:%s, it is created", pWal->vgId, pWal->name); wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
} }
if (pWal->keep != 1 && lastId != -1) { if (!pWal->keep && lastId != -1) {
// remove last wal file // remove last wal file
char name[TSDB_FILENAME_LEN + 20]; char name[TSDB_FILENAME_LEN + 20];
snprintf(name, sizeof(name), "%s/%s%" PRIu64, pWal->path, WAL_PREFIX, lastId); snprintf(name, sizeof(name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, lastId);
if (remove(name) < 0) { if (remove(name) < 0) {
wError("vgId:%d, wal:%s, failed to remove since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, name, strerror(errno));
} else { } else {
wDebug("vgId:%d, wal:%s, it is removed", pWal->vgId, pWal->name); wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, name);
} }
} }
...@@ -88,9 +88,10 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -88,9 +88,10 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
pWal->version = pHead->version; pWal->version = pHead->version;
wTrace("vgId:%d, write version:%" PRId64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId);
} }
ASSERT(contLen == pHead->len + sizeof(SWalHead)); ASSERT(contLen == pHead->len + sizeof(SWalHead));
...@@ -104,7 +105,7 @@ void walFsync(void *handle) { ...@@ -104,7 +105,7 @@ void walFsync(void *handle) {
if (pWal->fsyncPeriod == 0) { if (pWal->fsyncPeriod == 0) {
if (fsync(pWal->fd) < 0) { if (fsync(pWal->fd) < 0) {
wError("vgId:%d, wal:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, fsync failed since %s", pWal->vgId, pWal->name, strerror(errno));
} }
} }
} }
...@@ -124,16 +125,23 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * ...@@ -124,16 +125,23 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
char *fileName = ent->d_name; char *fileName = ent->d_name;
if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); int64_t fileId = atoll(fileName + WAL_PREFIX_LEN);
if (fileId == pWal->fileId) continue; if (fileId == pWal->fileId) continue;
wDebug("vgId:%d, wal:%s, will be restored", pWal->vgId, fileName); char walName[WAL_FILE_LEN];
snprintf(walName, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, fileName); wDebug("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
int32_t code = walRestoreWalFile(pWal, pVnode, writeFp, walName);
if (code != TSDB_CODE_SUCCESS) continue; if (code != TSDB_CODE_SUCCESS) continue;
wInfo("vgId:%d, wal:%s, restore success, remove this file", pWal->vgId, fileName);
remove(fileName); if (!pWal->keep) {
wDebug("vgId:%d, file:%s, restore success, remove this file", pWal->vgId, walName);
remove(walName);
} else {
wDebug("vgId:%d, file:%s, restore success and keep it", pWal->vgId, walName);
}
count++; count++;
} }
...@@ -142,6 +150,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * ...@@ -142,6 +150,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
if (pWal->keep) { if (pWal->keep) {
if (count == 0) { if (count == 0) {
wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name);
return walRenew(pWal); return walRenew(pWal);
} else { } else {
// open the existing WAL file in append mode // open the existing WAL file in append mode
...@@ -149,9 +158,10 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * ...@@ -149,9 +158,10 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (pWal->fd < 0) {
wError("vgId:%d, wal:%s, failed to open file, reason:%s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
wDebug("vgId:%d, file:%s open success", pWal->vgId, pWal->name);
} }
} }
...@@ -176,18 +186,18 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -176,18 +186,18 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
int32_t size = WAL_MAX_SIZE; int32_t size = WAL_MAX_SIZE;
void * buffer = tmalloc(size); void * buffer = tmalloc(size);
if (buffer == NULL) { if (buffer == NULL) {
wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
int32_t fd = open(name, O_RDWR); int32_t fd = open(name, O_RDWR);
if (fd < 0) { if (fd < 0) {
wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
tfree(buffer); tfree(buffer);
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
wDebug("vgId:%d, wal:%s, start to restore", pWal->vgId, name); wDebug("vgId:%d, file:%s, start to restore", pWal->vgId, name);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
size_t offset = 0; size_t offset = 0;
...@@ -198,13 +208,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -198,13 +208,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret == 0) break; if (ret == 0) break;
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, wal:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to read wal head part since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
if (ret < sizeof(SWalHead)) { if (ret < sizeof(SWalHead)) {
wError("vgId:%d, wal:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId, wError("vgId:%d, file:%s, failed to read wal head since %s, read size:%d, skip the rest of file", pWal->vgId,
name, strerror(errno), ret); name, strerror(errno), ret);
taosFtruncate(fd, offset); taosFtruncate(fd, offset);
fsync(fd); fsync(fd);
...@@ -212,7 +222,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -212,7 +222,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
} }
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, wal:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name); wError("vgId:%d, file:%s, wal head cksum is messed up, skip the rest of file", pWal->vgId, name);
code = TSDB_CODE_WAL_FILE_CORRUPTED; code = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false); ASSERT(false);
break; break;
...@@ -222,7 +232,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -222,7 +232,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
size = sizeof(SWalHead) + pHead->len; size = sizeof(SWalHead) + pHead->len;
buffer = realloc(buffer, size); buffer = realloc(buffer, size);
if (buffer == NULL) { if (buffer == NULL) {
wError("vgId:%d, wal:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
...@@ -232,13 +242,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -232,13 +242,13 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
ret = taosTRead(fd, pHead->cont, pHead->len); ret = taosTRead(fd, pHead->cont, pHead->len);
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, wal:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s failed to read wal body part since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
if (ret < pHead->len) { if (ret < pHead->len) {
wError("vgId:%d, wal:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId, wError("vgId:%d, file:%s, failed to read body since %s, read size:%d len:%d , skip the rest of file", pWal->vgId,
name, strerror(errno), ret, pHead->len); name, strerror(errno), ret, pHead->len);
taosFtruncate(fd, offset); taosFtruncate(fd, offset);
fsync(fd); fsync(fd);
...@@ -248,6 +258,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -248,6 +258,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
offset = offset + sizeof(SWalHead) + pHead->len; offset = offset + sizeof(SWalHead) + pHead->len;
if (pWal->keep) pWal->version = pHead->version; if (pWal->keep) pWal->version = pHead->version;
wTrace("vgId:%d, restore version:%" PRIu64 ", fileId:%" PRId64, pWal->vgId, pWal->version, pWal->fileId);
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
} }
...@@ -279,7 +292,7 @@ static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId ...@@ -279,7 +292,7 @@ static int32_t walGetNextFile(SWal *pWal, int64_t lastFileId, int64_t *nexFileId
char *fileName = ent->d_name; char *fileName = ent->d_name;
if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { if (strncmp(fileName, WAL_PREFIX, WAL_PREFIX_LEN) == 0) {
uint64_t fileId = atoll(fileName + WAL_PREFIX_LEN); int64_t fileId = atoll(fileName + WAL_PREFIX_LEN);
if (fileId <= lastFileId) continue; if (fileId <= lastFileId) continue;
if (fileId < nearFileId) { if (fileId < nearFileId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册