提交 7c7644b5 编写于 作者: H Hongze Cheng

more progress

上级 7820ba78
......@@ -60,6 +60,7 @@ void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbCreateMFile(SMFile* pMFile);
int tsdbUpdateMFileHeader(SMFile* pMFile);
int tsdbScanAndTryFixMFile(SMFile* pMFile);
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; }
......@@ -300,6 +301,7 @@ void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
......
......@@ -15,14 +15,17 @@
#include "tsdbint.h"
#define TSDB_FS_CURRENT_FNAME "current"
#define TSDB_FS_TEMP_FNAME "current.t"
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
static const char *tsdbTxnFname[] = {"current.t", "current"};
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbApplyFSTxn(STsdbFS *pfs, int vid);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo);
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo);
// ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
......@@ -100,7 +103,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return tlen;
}
static UNUSED_FUNC void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
tsdbResetFSStatus(pStatus);
pStatus->pmf = &(pStatus->mf);
......@@ -228,7 +231,23 @@ void *tsdbFreeFS(STsdbFS *pfs) {
// TODO
int tsdbOpenFS(STsdbRepo *pRepo) {
// TODO
STsdbFS * pfs = REPO_FS(pRepo);
char current[TSDB_FILENAME_LEN] = "\0";
ASSERT(pfs != NULL);
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
if (access(current, F_OK) == 0) {
if (tsdbOpenFSFromCurrent(pRepo) < 0) {
tsdbError("vgId:%d failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
} else {
// TODO: current file not exists, try to recover it
}
return 0;
}
......@@ -245,9 +264,13 @@ void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) {
pfs->intxn = true;
tsdbResetFSStatus(pfs->nstatus);
pfs->nstatus->meta = pfs->cstatus->meta;
if (pfs->cstatus->pmf == NULL) {
pfs->nstatus->meta.version = 0;
} else {
pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1;
}
pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd;
pfs->nstatus->meta.version = pfs->cstatus->meta.totalStorage += storageAdd;
pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd;
}
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }
......@@ -297,8 +320,8 @@ static int tsdbApplyFSTxn(STsdbFS *pfs, int vid) {
char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0";
snprintf(tfname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), vid, TSDB_FS_TEMP_FNAME);
snprintf(cfname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), vid, TSDB_FS_CURRENT_FNAME);
tsdbGetTxnFname(vid, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(vid, TSDB_TXN_CURR_FILE, cfname);
int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC, 0755);
if (fd < 0) {
......@@ -528,3 +551,135 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
return 1;
}
}
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]);
}
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
int fd = -1;
void * buffer = NULL;
SFSHeader fsheader;
char current[TSDB_FILENAME_LEN] = "\0";
void * ptr;
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
// current file exists, try to recover
fd = open(current, O_RDONLY);
if (fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), current, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (tsdbMakeRoom(&buffer, TSDB_FILE_HEAD_SIZE) < 0) {
goto _err;
}
int nread = taosRead(fd, buffer, TSDB_FILE_HEAD_SIZE);
if (nread < 0) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (nread < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to read header of file %s, read bytes:%d", REPO_ID(pRepo), current, nread);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buffer, TSDB_FILE_HEAD_SIZE)) {
tsdbError("vgId:%d header of file %s failed checksum check", REPO_ID(pRepo), current);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
ptr = buffer;
ptr = tsdbDecodeFSHeader(ptr, &fsheader);
if (fsheader.version != TSDB_FS_VERSION) {
// TODO: handle file version change
}
SFSStatus *pStatus = pfs->cstatus;
if (fsheader.len > 0) {
if (tsdbMakeRoom(&buffer, fsheader.len) < 0) {
goto _err;
}
nread = taosRead(fd, buffer, fsheader.len);
if (nread < 0) {
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (nread < fsheader.len) {
tsdbError("vgId:%d failed to read %d bytes from file %s", REPO_ID(pRepo), fsheader.len, current);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buffer, fsheader.len)) {
tsdbError("vgId:%d file %s is corrupted since wrong checksum", REPO_ID(pRepo), current);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
ptr = buffer;
ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
ptr = tsdbDecodeFSStatus(ptr, pStatus);
} else {
tsdbResetFSStatus(pStatus);
}
taosTZfree(buffer);
close(fd);
if (tsdbScanAndTryFixFS(pRepo) < 0) {
return -1;
}
return 0;
_err:
if (fd >= 0) {
close(fd);
}
taosTZfree(buffer);
return -1;
}
// Scan and try to fix incorrect files
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
if (pStatus->pmf) {
if (tsdbScanAndTryFixMFile(pStatus->pmf) < 0) {
tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
size_t size = taosArrayGetSize(pStatus->df);
for (size_t i = 0; i < size; i++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i);
if (tsdbScanAndTryFixDFileSet(pSet) < 0) {
tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
// TODO: remove those unused files
{}
return 0;
}
\ No newline at end of file
......@@ -128,6 +128,40 @@ int tsdbUpdateMFileHeader(SMFile *pMFile) {
return 0;
}
int tsdbScanAndTryFixMFile(SMFile *pMFile) {
struct stat mfstat;
SMFile mf = *pMFile;
if (stat(TSDB_FILE_FULL_NAME(&mf), &mfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (pMFile->info.size > mfstat.st_size) {
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
return -1;
}
if (taosFtruncate(mf.fd, mf.info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(&mf);
return -1;
}
if (tsdbUpdateMFileHeader(&mf) < 0) {
tsdbCloseMFile(&mf);
return -1;
}
tsdbCloseMFile(&mf);
} else if (pMFile->info.size < mfstat.st_size) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
return 0;
}
static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
int tlen = 0;
......@@ -251,6 +285,40 @@ int tsdbUpdateDFileHeader(SDFile *pDFile) {
return 0;
}
static int tsdbScanAndTryFixDFile(SDFile *pDFile) {
struct stat dfstat;
SDFile df = *pDFile;
if (stat(TSDB_FILE_FULL_NAME(&df), &dfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (pDFile->info.size > dfstat.st_size) {
if (tsdbOpenDFile(&df, O_WRONLY) < 0) {
return -1;
}
if (taosFtruncate(df.fd, df.info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseDFile(&df);
return -1;
}
if (tsdbUpdateDFileHeader(&df) < 0) {
tsdbCloseDFile(&df);
return -1;
}
tsdbCloseDFile(&df);
} else if (pDFile->info.size < dfstat.st_size) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
return 0;
}
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
......@@ -388,6 +456,15 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return 0;
}
int tsdbScanAndTryFixDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbScanAndTryFixDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
return -1;
}
}
return 0;
}
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) {
ASSERT(ftype != TSDB_FILE_MAX);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册