提交 dbe3b6f8 编写于 作者: S Shengliang Guan

TD-1895

上级 6a4481f4
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tutil.h" #include "tutil.h"
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "tfile.h"
#include "twal.h" #include "twal.h"
#include "trpc.h" #include "trpc.h"
#include "dnode.h" #include "dnode.h"
...@@ -55,6 +56,7 @@ typedef struct { ...@@ -55,6 +56,7 @@ typedef struct {
} SDnodeComponent; } SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = { static const SDnodeComponent tsDnodeComponents[] = {
{"tfile", tfInit, tfCleanup},
{"rpc", rpcInit, rpcCleanup}, {"rpc", rpcInit, rpcCleanup},
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
......
...@@ -51,9 +51,8 @@ typedef struct { ...@@ -51,9 +51,8 @@ typedef struct {
typedef void * twalh; // WAL HANDLE typedef void * twalh; // WAL HANDLE
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
int32_t walInit(); int32_t walInit();
void walCleanUp(); void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg); twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg); int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh); void walStop(twalh);
......
...@@ -20,17 +20,6 @@ ...@@ -20,17 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#define tread(fd, buf, count) read(fd, buf, count)
#define twrite(fd, buf, count) write(fd, buf, count)
#define tlseek(fd, offset, whence) lseek(fd, offset, whence)
#define tclose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
int64_t taosReadImp(int32_t fd, void *buf, int64_t count); int64_t taosReadImp(int32_t fd, void *buf, int64_t count);
int64_t taosWriteImp(int32_t fd, void *buf, int64_t count); int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence); int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
...@@ -39,7 +28,13 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP ...@@ -39,7 +28,13 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count) #define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count) #define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence) #define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence)
#define taosClose(x) tclose(x) #define taosClose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
// TAOS_OS_FUNC_FILE_SENDIFLE // TAOS_OS_FUNC_FILE_SENDIFLE
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
......
...@@ -116,7 +116,7 @@ int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) { ...@@ -116,7 +116,7 @@ int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) {
} }
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
return (int64_t)tlseek(fd, (long)offset, whence); return (int64_t)lseek(fd, (long)offset, whence);
} }
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE #ifndef TAOS_OS_FUNC_FILE_SENDIFLE
......
...@@ -20,23 +20,26 @@ ...@@ -20,23 +20,26 @@
extern "C" { extern "C" {
#endif #endif
#include <unistd.h>
// init taos file module // init taos file module
int32_t tfinit(); int32_t tfInit();
// clean up taos file module // clean up taos file module
void tfcleanup(); void tfCleanup();
// the same syntax as UNIX standard open/close/read/write // the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused // but FD is int64_t and will never be reused
int64_t tfopen(const char *pathname, int32_t flags); int64_t tfOpen(const char *pathname, int32_t flags);
int64_t tfclose(int64_t tfd); int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode);
int64_t tfwrite(int64_t tfd, void *buf, int64_t count); int64_t tfClose(int64_t tfd);
int64_t tfread(int64_t tfd, void *buf, int64_t count); int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd);
int32_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
int32_t tfFtruncate(int64_t tfd, int64_t length);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // TDENGINE_TREF_H #endif // TDENGINE_TFILE_H
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tulog.h" #include "tulog.h"
...@@ -21,40 +22,52 @@ ...@@ -21,40 +22,52 @@
static int32_t tsFileRsetId = -1; static int32_t tsFileRsetId = -1;
static void taosCloseFile(void *p) { static void tfCloseFile(void *p) {
close((int32_t)(uintptr_t)p); close((int32_t)(uintptr_t)p);
} }
int32_t tfinit() { int32_t tfInit() {
tsFileRsetId = taosOpenRef(2000, taosCloseFile); tsFileRsetId = taosOpenRef(2000, tfCloseFile);
return tsFileRsetId; if (tsFileRsetId > 0) {
return 0;
} else {
return -1;
}
} }
void tfcleanup() { void tfCleanup() {
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
tsFileRsetId = -1; tsFileRsetId = -1;
} }
int64_t tfopen(const char *pathname, int32_t flags) { static int64_t tfOpenImp(int32_t fd) {
int32_t fd = open(pathname, flags);
if (fd < 0) { if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
void *p = (void *)(int64_t)fd; void * p = (void *)(int64_t)fd;
int64_t rid = taosAddRef(tsFileRsetId, p); int64_t rid = taosAddRef(tsFileRsetId, p);
if (rid < 0) close(fd); if (rid < 0) close(fd);
return rid; return rid;
} }
int64_t tfclose(int64_t tfd) { int64_t tfOpen(const char *pathname, int32_t flags) {
int32_t fd = open(pathname, flags);
return tfOpenImp(fd);
}
int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode) {
int32_t fd = open(pathname, flags, mode);
return tfOpenImp(fd);
}
int64_t tfClose(int64_t tfd) {
return taosRemoveRef(tsFileRsetId, tfd); return taosRemoveRef(tsFileRsetId, tfd);
} }
int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd); void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1; if (p == NULL) return -1;
...@@ -67,7 +80,7 @@ int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { ...@@ -67,7 +80,7 @@ int64_t tfwrite(int64_t tfd, void *buf, int64_t count) {
return ret; return ret;
} }
int64_t tfread(int64_t tfd, void *buf, int64_t count) { int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd); void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1; if (p == NULL) return -1;
...@@ -79,3 +92,32 @@ int64_t tfread(int64_t tfd, void *buf, int64_t count) { ...@@ -79,3 +92,32 @@ int64_t tfread(int64_t tfd, void *buf, int64_t count) {
taosReleaseRef(tsFileRsetId, tfd); taosReleaseRef(tsFileRsetId, tfd);
return ret; return ret;
} }
int64_t tfFsync(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return fsync(fd);
}
bool tfValid(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
return p != NULL;
}
int32_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosLSeek(fd, offset, whence);
}
int32_t tfFtruncate(int64_t tfd, int64_t length) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosFtruncate(fd, length);
}
...@@ -44,8 +44,8 @@ typedef struct { ...@@ -44,8 +44,8 @@ typedef struct {
uint64_t version; uint64_t version;
int64_t fileId; int64_t fileId;
int64_t rid; int64_t rid;
int64_t tfd;
int32_t vgId; int32_t vgId;
int32_t fd;
int32_t keep; int32_t keep;
int32_t level; int32_t level;
int32_t fsyncPeriod; int32_t fsyncPeriod;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tref.h" #include "tref.h"
#include "tfile.h"
#include "twal.h" #include "twal.h"
#include "walInt.h" #include "walInt.h"
...@@ -61,7 +62,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { ...@@ -61,7 +62,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
} }
pWal->vgId = pCfg->vgId; pWal->vgId = pCfg->vgId;
pWal->fd = -1; pWal->tfd = -1;
pWal->fileId = -1; pWal->fileId = -1;
pWal->level = pCfg->walLevel; pWal->level = pCfg->walLevel;
pWal->keep = pCfg->keep; pWal->keep = pCfg->keep;
...@@ -124,7 +125,7 @@ void walClose(void *handle) { ...@@ -124,7 +125,7 @@ void walClose(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
taosClose(pWal->fd); tfClose(pWal->tfd);
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal->rid); taosRemoveRef(tsWal.refId, pWal->rid);
} }
...@@ -143,7 +144,7 @@ static void walFreeObj(void *wal) { ...@@ -143,7 +144,7 @@ static void walFreeObj(void *wal) {
SWal *pWal = wal; SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
taosClose(pWal->fd); tfClose(pWal->tfd);
pthread_mutex_destroy(&pWal->mutex); pthread_mutex_destroy(&pWal->mutex);
tfree(pWal); tfree(pWal);
} }
...@@ -172,7 +173,7 @@ static void walFsyncAll() { ...@@ -172,7 +173,7 @@ static void walFsyncAll() {
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
int32_t code = fsync(pWal->fd); int32_t code = tfFsync(pWal->tfd);
if (code != 0) { if (code != 0) {
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tfile.h"
#include "twal.h" #include "twal.h"
#include "walInt.h" #include "walInt.h"
...@@ -36,8 +37,8 @@ int32_t walRenew(void *handle) { ...@@ -36,8 +37,8 @@ int32_t walRenew(void *handle) {
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >= 0) { if (tfValid(pWal->tfd)) {
tclose(pWal->fd); tfClose(pWal->tfd);
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
} }
...@@ -49,9 +50,9 @@ int32_t walRenew(void *handle) { ...@@ -49,9 +50,9 @@ int32_t walRenew(void *handle) {
} }
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (!tfValid(pWal->tfd)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
...@@ -67,7 +68,7 @@ void walRemoveOneOldFile(void *handle) { ...@@ -67,7 +68,7 @@ void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL) return; if (pWal == NULL) return;
if (pWal->keep == TAOS_WAL_KEEP) return; if (pWal->keep == TAOS_WAL_KEEP) return;
if (pWal->fd <= 0) return; if (!tfValid(pWal->tfd)) return;
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
...@@ -113,7 +114,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -113,7 +114,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
int32_t code = 0; int32_t code = 0;
// no wal // no wal
if (pWal->fd <= 0) return 0; if (!tfValid(pWal->tfd)) return 0;
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0; if (pHead->version <= pWal->version) return 0;
...@@ -123,12 +124,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -123,12 +124,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (taosWrite(pWal->fd, pHead, contLen) != contLen) { if (tfWrite(pWal->tfd, pHead, contLen) != contLen) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
wTrace("vgId:%d, write wal, fileId:%" PRId64 " fd:%d hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId, wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,
pWal->fileId, pWal->fd, pHead->version, pWal->version, pHead->len); pWal->fileId, pWal->tfd, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version; pWal->version = pHead->version;
} }
...@@ -141,11 +142,11 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -141,11 +142,11 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
void walFsync(void *handle, bool forceFsync) { void walFsync(void *handle, bool forceFsync) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL || pWal->fd < 0) return; if (pWal == NULL || !tfValid(pWal->tfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId); wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId);
if (fsync(pWal->fd) < 0) { if (tfFsync(pWal->tfd) < 0) {
wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno)); wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno));
} }
} }
...@@ -186,8 +187,8 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { ...@@ -186,8 +187,8 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
// open the existing WAL file in append mode // open the existing WAL file in append mode
pWal->fileId = 0; pWal->fileId = 0;
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) { if (!tfValid(pWal->tfd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
...@@ -217,22 +218,22 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { ...@@ -217,22 +218,22 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
return code; return code;
} }
static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) { static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) {
taosFtruncate(fd, offset); tfFtruncate(tfd, offset);
fsync(fd); tfFsync(tfd);
} }
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) { static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset; int64_t pos = *offset;
while (1) { while (1) {
pos++; pos++;
if (lseek(fd, pos, SEEK_SET) < 0) { if (tfLseek(tfd, pos, SEEK_SET) < 0) {
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
if (taosRead(fd, pHead, sizeof(SWalHead)) <= 0) { if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
...@@ -259,8 +260,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -259,8 +260,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
int32_t fd = open(name, O_RDWR); int64_t tfd = tfOpen(name, O_RDWR);
if (fd < 0) { if (!tfValid(tfd)) {
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
tfree(buffer); tfree(buffer);
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
...@@ -273,7 +274,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -273,7 +274,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
SWalHead *pHead = buffer; SWalHead *pHead = buffer;
while (1) { while (1) {
int32_t ret = taosRead(fd, pHead, sizeof(SWalHead)); int32_t ret = tfRead(tfd, pHead, sizeof(SWalHead));
if (ret == 0) break; if (ret == 0) break;
if (ret < 0) { if (ret < 0) {
...@@ -284,16 +285,16 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -284,16 +285,16 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret < sizeof(SWalHead)) { if (ret < sizeof(SWalHead)) {
wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
walFtruncate(pWal, fd, offset); walFtruncate(pWal, tfd, offset);
break; break;
} }
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, fd, &offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, fd, offset); walFtruncate(pWal, tfd, offset);
break; break;
} }
} }
...@@ -310,7 +311,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -310,7 +311,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pHead = buffer; pHead = buffer;
} }
ret = taosRead(fd, pHead->cont, pHead->len); ret = tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
...@@ -332,7 +333,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -332,7 +333,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
} }
tclose(fd); tfClose(tfd);
tfree(buffer); tfree(buffer);
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册