未验证 提交 4029cda3 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4373 from taosdata/feature/wal

TD-1895
......@@ -19,6 +19,7 @@
#include "tutil.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tfile.h"
#include "twal.h"
#include "trpc.h"
#include "dnode.h"
......@@ -55,6 +56,7 @@ typedef struct {
} SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = {
{"tfile", tfInit, tfCleanup},
{"rpc", rpcInit, rpcCleanup},
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
......
......@@ -51,9 +51,8 @@ typedef struct {
typedef void * twalh; // WAL HANDLE
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
int32_t walInit();
void walCleanUp();
int32_t walInit();
void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh);
......
......@@ -20,17 +20,6 @@
extern "C" {
#endif
#define tread(fd, buf, count) read(fd, buf, count)
#define twrite(fd, buf, count) write(fd, buf, count)
#define tlseek(fd, offset, whence) lseek(fd, offset, whence)
#define tclose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
int64_t taosReadImp(int32_t fd, void *buf, int64_t count);
int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
......@@ -39,7 +28,13 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence)
#define taosClose(x) tclose(x)
#define taosClose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
// TAOS_OS_FUNC_FILE_SENDIFLE
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
......
......@@ -116,7 +116,7 @@ int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) {
}
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
return (int64_t)tlseek(fd, (long)offset, whence);
return (int64_t)lseek(fd, (long)offset, whence);
}
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE
......
......@@ -20,23 +20,26 @@
extern "C" {
#endif
#include <unistd.h>
// init taos file module
int32_t tfinit();
int32_t tfInit();
// clean up taos file module
void tfcleanup();
void tfCleanup();
// the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused
int64_t tfopen(const char *pathname, int32_t flags);
int64_t tfclose(int64_t tfd);
int64_t tfwrite(int64_t tfd, void *buf, int64_t count);
int64_t tfread(int64_t tfd, void *buf, int64_t count);
int64_t tfOpen(const char *pathname, int32_t flags);
int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode);
int64_t tfClose(int64_t tfd);
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
int32_t tfFtruncate(int64_t tfd, int64_t length);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TREF_H
#endif // TDENGINE_TFILE_H
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
......@@ -21,40 +22,52 @@
static int32_t tsFileRsetId = -1;
static void taosCloseFile(void *p) {
static void tfCloseFile(void *p) {
close((int32_t)(uintptr_t)p);
}
int32_t tfinit() {
tsFileRsetId = taosOpenRef(2000, taosCloseFile);
return tsFileRsetId;
int32_t tfInit() {
tsFileRsetId = taosOpenRef(2000, tfCloseFile);
if (tsFileRsetId > 0) {
return 0;
} else {
return -1;
}
}
void tfcleanup() {
void tfCleanup() {
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
tsFileRsetId = -1;
}
int64_t tfopen(const char *pathname, int32_t flags) {
int32_t fd = open(pathname, flags);
static int64_t tfOpenImp(int32_t fd) {
if (fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
void *p = (void *)(int64_t)fd;
void * p = (void *)(int64_t)fd;
int64_t rid = taosAddRef(tsFileRsetId, p);
if (rid < 0) close(fd);
return rid;
}
int64_t tfclose(int64_t tfd) {
int64_t tfOpen(const char *pathname, int32_t flags) {
int32_t fd = open(pathname, flags);
return tfOpenImp(fd);
}
int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode) {
int32_t fd = open(pathname, flags, mode);
return tfOpenImp(fd);
}
int64_t tfClose(int64_t tfd) {
return taosRemoveRef(tsFileRsetId, tfd);
}
int64_t tfwrite(int64_t tfd, void *buf, int64_t count) {
int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
......@@ -67,7 +80,7 @@ int64_t tfwrite(int64_t tfd, void *buf, int64_t count) {
return ret;
}
int64_t tfread(int64_t tfd, void *buf, int64_t count) {
int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
......@@ -79,3 +92,32 @@ int64_t tfread(int64_t tfd, void *buf, int64_t count) {
taosReleaseRef(tsFileRsetId, tfd);
return ret;
}
int64_t tfFsync(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return fsync(fd);
}
bool tfValid(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
return p != NULL;
}
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosLSeek(fd, offset, whence);
}
int32_t tfFtruncate(int64_t tfd, int64_t length) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
return taosFtruncate(fd, length);
}
......@@ -44,8 +44,8 @@ typedef struct {
uint64_t version;
int64_t fileId;
int64_t rid;
int64_t tfd;
int32_t vgId;
int32_t fd;
int32_t keep;
int32_t level;
int32_t fsyncPeriod;
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "twal.h"
#include "walInt.h"
......@@ -61,7 +62,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
}
pWal->vgId = pCfg->vgId;
pWal->fd = -1;
pWal->tfd = -1;
pWal->fileId = -1;
pWal->level = pCfg->walLevel;
pWal->keep = pCfg->keep;
......@@ -124,7 +125,7 @@ void walClose(void *handle) {
SWal *pWal = handle;
pthread_mutex_lock(&pWal->mutex);
taosClose(pWal->fd);
tfClose(pWal->tfd);
pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refId, pWal->rid);
}
......@@ -143,7 +144,7 @@ static void walFreeObj(void *wal) {
SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
taosClose(pWal->fd);
tfClose(pWal->tfd);
pthread_mutex_destroy(&pWal->mutex);
tfree(pWal);
}
......@@ -172,7 +173,7 @@ static void walFsyncAll() {
while (pWal) {
if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
int32_t code = fsync(pWal->fd);
int32_t code = tfFsync(pWal->tfd);
if (code != 0) {
wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code));
}
......
......@@ -18,6 +18,7 @@
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tfile.h"
#include "twal.h"
#include "walInt.h"
......@@ -36,8 +37,8 @@ int32_t walRenew(void *handle) {
pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >= 0) {
tclose(pWal->fd);
if (tfValid(pWal->tfd)) {
tfClose(pWal->tfd);
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
}
......@@ -49,9 +50,9 @@ int32_t walRenew(void *handle) {
}
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) {
if (!tfValid(pWal->tfd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
} else {
......@@ -67,7 +68,7 @@ void walRemoveOneOldFile(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
if (pWal->keep == TAOS_WAL_KEEP) return;
if (pWal->fd <= 0) return;
if (!tfValid(pWal->tfd)) return;
pthread_mutex_lock(&pWal->mutex);
......@@ -113,7 +114,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
int32_t code = 0;
// no wal
if (pWal->fd <= 0) return 0;
if (!tfValid(pWal->tfd)) return 0;
if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0;
......@@ -123,12 +124,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
pthread_mutex_lock(&pWal->mutex);
if (taosWrite(pWal->fd, pHead, contLen) != contLen) {
if (tfWrite(pWal->tfd, pHead, contLen) != contLen) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else {
wTrace("vgId:%d, write wal, fileId:%" PRId64 " fd:%d hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,
pWal->fileId, pWal->fd, pHead->version, pWal->version, pHead->len);
wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,
pWal->fileId, pWal->tfd, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version;
}
......@@ -141,11 +142,11 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
void walFsync(void *handle, bool forceFsync) {
SWal *pWal = handle;
if (pWal == NULL || pWal->fd < 0) return;
if (pWal == NULL || !tfValid(pWal->tfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId);
if (fsync(pWal->fd) < 0) {
if (tfFsync(pWal->tfd) < 0) {
wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno));
}
}
......@@ -186,8 +187,8 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
// open the existing WAL file in append mode
pWal->fileId = 0;
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (pWal->fd < 0) {
pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
if (!tfValid(pWal->tfd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
......@@ -217,22 +218,22 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
return code;
}
static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) {
taosFtruncate(fd, offset);
fsync(fd);
static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) {
tfFtruncate(tfd, offset);
tfFsync(tfd);
}
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) {
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset;
while (1) {
pos++;
if (lseek(fd, pos, SEEK_SET) < 0) {
if (tfLseek(tfd, pos, SEEK_SET) < 0) {
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (taosRead(fd, pHead, sizeof(SWalHead)) <= 0) {
if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
......@@ -259,8 +260,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
return TAOS_SYSTEM_ERROR(errno);
}
int32_t fd = open(name, O_RDWR);
if (fd < 0) {
int64_t tfd = tfOpen(name, O_RDWR);
if (!tfValid(tfd)) {
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
tfree(buffer);
return TAOS_SYSTEM_ERROR(errno);
......@@ -273,7 +274,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
SWalHead *pHead = buffer;
while (1) {
int32_t ret = taosRead(fd, pHead, sizeof(SWalHead));
int32_t ret = tfRead(tfd, pHead, sizeof(SWalHead));
if (ret == 0) break;
if (ret < 0) {
......@@ -284,16 +285,16 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret < sizeof(SWalHead)) {
wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
walFtruncate(pWal, fd, offset);
walFtruncate(pWal, tfd, offset);
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, fd, &offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, fd, offset);
walFtruncate(pWal, tfd, offset);
break;
}
}
......@@ -310,7 +311,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pHead = buffer;
}
ret = taosRead(fd, pHead->cont, pHead->len);
ret = tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
......@@ -332,7 +333,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
}
tclose(fd);
tfClose(tfd);
tfree(buffer);
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册