提交 2980d7b1 编写于 作者: H Hongze Cheng

fix more bug

上级 822f1051
...@@ -40,7 +40,8 @@ extern "C" { ...@@ -40,7 +40,8 @@ extern "C" {
// TSDB STATE DEFINITION // TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0 #define TSDB_STATE_OK 0x0
#define TSDB_STATE_BAD_FILE 0x1 #define TSDB_STATE_BAD_META 0x1
#define TSDB_STATE_BAD_DATA 0x2
// --------- TSDB APPLICATION HANDLE DEFINITION // --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct { typedef struct {
......
...@@ -224,6 +224,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -224,6 +224,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
int32_t code = syncRestoreFile(pPeer, &fversion); int32_t code = syncRestoreFile(pPeer, &fversion);
if (code < 0) { if (code < 0) {
(*pNode->stopSyncFileFp)(pNode->vgId, fversion);
sError("%s, failed to restore files", pPeer->id); sError("%s, failed to restore files", pPeer->id);
return -1; return -1;
} }
......
...@@ -63,7 +63,7 @@ int tsdbApplyMFileChange(SMFile* from, SMFile* to); ...@@ -63,7 +63,7 @@ int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbCreateMFile(SMFile* pMFile, bool updateHeader); int tsdbCreateMFile(SMFile* pMFile, bool updateHeader);
int tsdbUpdateMFileHeader(SMFile* pMFile); int tsdbUpdateMFileHeader(SMFile* pMFile);
int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo);
int tsdbScanAndTryFixMFile(SMFile* pMFile); int tsdbScanAndTryFixMFile(STsdbRepo* pRepo);
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
...@@ -310,7 +310,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); ...@@ -310,7 +310,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader); int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
......
...@@ -68,7 +68,7 @@ extern "C" { ...@@ -68,7 +68,7 @@ extern "C" {
#include "tsdbCommitQueue.h" #include "tsdbCommitQueue.h"
// Main definitions // Main definitions
struct STsdbRepo { struct STsdbRepo {
int8_t state; uint8_t state;
STsdbCfg config; STsdbCfg config;
STsdbAppH appH; STsdbAppH appH;
......
...@@ -255,7 +255,7 @@ int tsdbOpenFS(STsdbRepo *pRepo) { ...@@ -255,7 +255,7 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
} }
// Load meta cache if has meta file // Load meta cache if has meta file
if (tsdbLoadMetaCache(pRepo, true) < 0) { if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) {
tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
...@@ -670,11 +670,9 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { ...@@ -670,11 +670,9 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus; SFSStatus *pStatus = pfs->cstatus;
if (pStatus->pmf) { if (tsdbScanAndTryFixMFile(pRepo) < 0) {
if (tsdbScanAndTryFixMFile(pStatus->pmf) < 0) { tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1;
return -1;
}
} }
size_t size = taosArrayGetSize(pStatus->df); size_t size = taosArrayGetSize(pStatus->df);
...@@ -682,13 +680,13 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { ...@@ -682,13 +680,13 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
for (size_t i = 0; i < size; i++) { for (size_t i = 0; i < size; i++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i); SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i);
if (tsdbScanAndTryFixDFileSet(pSet) < 0) { if (tsdbScanAndTryFixDFileSet(pRepo, pSet) < 0) {
tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
} }
// : remove those unused files // remove those unused files
tsdbScanRootDir(pRepo); tsdbScanRootDir(pRepo);
tsdbScanDataDir(pRepo); tsdbScanDataDir(pRepo);
return 0; return 0;
......
...@@ -163,9 +163,23 @@ int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) { ...@@ -163,9 +163,23 @@ int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) {
return 0; return 0;
} }
int tsdbScanAndTryFixMFile(SMFile *pMFile) { int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) {
SMFile * pMFile = pRepo->fs->cstatus->pmf;
struct stat mfstat; struct stat mfstat;
SMFile mf = *pMFile; SMFile mf;
if (pMFile == NULL) {
return 0;
}
mf = *pMFile;
if (access(TSDB_FILE_FULL_NAME(pMFile), F_OK) != 0) {
tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pMFile));
pRepo->state |= TSDB_STATE_BAD_META;
return 0;
}
if (stat(TSDB_FILE_FULL_NAME(&mf), &mfstat) < 0) { if (stat(TSDB_FILE_FULL_NAME(&mf), &mfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -189,9 +203,14 @@ int tsdbScanAndTryFixMFile(SMFile *pMFile) { ...@@ -189,9 +203,14 @@ int tsdbScanAndTryFixMFile(SMFile *pMFile) {
} }
tsdbCloseMFile(&mf); tsdbCloseMFile(&mf);
tsdbInfo("vgId:%d file %s is truncated from %" PRId64 " to %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
mfstat.st_size, pMFile->info.size);
} else if (pMFile->info.size < mfstat.st_size) { } else if (pMFile->info.size < mfstat.st_size) {
tsdbError("vgId:%d meta file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), mfstat.st_size, pMFile->info.size);
pRepo->state |= TSDB_STATE_BAD_META;
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return 0;
} }
return 0; return 0;
...@@ -361,10 +380,17 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { ...@@ -361,10 +380,17 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
return 0; return 0;
} }
static int tsdbScanAndTryFixDFile(SDFile *pDFile) { static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) {
struct stat dfstat; struct stat dfstat;
SDFile df = *pDFile; SDFile df = *pDFile;
if (access(TSDB_FILE_FULL_NAME(pDFile), F_OK) != 0) {
tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pDFile));
pRepo->state |= TSDB_STATE_BAD_DATA;
return 0;
}
if (stat(TSDB_FILE_FULL_NAME(&df), &dfstat) < 0) { if (stat(TSDB_FILE_FULL_NAME(&df), &dfstat) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -387,9 +413,14 @@ static int tsdbScanAndTryFixDFile(SDFile *pDFile) { ...@@ -387,9 +413,14 @@ static int tsdbScanAndTryFixDFile(SDFile *pDFile) {
} }
tsdbCloseDFile(&df); tsdbCloseDFile(&df);
tsdbInfo("vgId:%d file %s is truncated from %" PRId64 " to %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
dfstat.st_size, pDFile->info.size);
} else if (pDFile->info.size < dfstat.st_size) { } else if (pDFile->info.size < dfstat.st_size) {
tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size);
pRepo->state |= TSDB_STATE_BAD_DATA;
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return 0;
} }
return 0; return 0;
...@@ -559,9 +590,9 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { ...@@ -559,9 +590,9 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return 0; return 0;
} }
int tsdbScanAndTryFixDFileSet(SDFileSet *pSet) { int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbScanAndTryFixDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
return -1; return -1;
} }
} }
......
...@@ -98,7 +98,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -98,7 +98,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
} }
// TODO: Restore information from data // TODO: Restore information from data
if (tsdbRestoreInfo(pRepo) < 0) { if ((!(pRepo->state & TSDB_STATE_BAD_DATA)) && tsdbRestoreInfo(pRepo) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno)); tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno));
tsdbCloseRepo(pRepo, false); tsdbCloseRepo(pRepo, false);
return NULL; return NULL;
......
...@@ -82,6 +82,8 @@ int32_t tsdbSyncRecv(void *tsdb, int32_t socketFd) { ...@@ -82,6 +82,8 @@ int32_t tsdbSyncRecv(void *tsdb, int32_t socketFd) {
STsdbRepo *pRepo = (STsdbRepo *)tsdb; STsdbRepo *pRepo = (STsdbRepo *)tsdb;
SSyncH synch = {0}; SSyncH synch = {0};
pRepo->state = TSDB_STATE_OK;
tsdbInitSyncH(&synch, pRepo, socketFd); tsdbInitSyncH(&synch, pRepo, socketFd);
tsdbStartFSTxn(pRepo, 0, 0); tsdbStartFSTxn(pRepo, 0, 0);
......
...@@ -242,7 +242,7 @@ int32_t vnodeOpen(int32_t vgId) { ...@@ -242,7 +242,7 @@ int32_t vnodeOpen(int32_t vgId) {
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
} else if (terrno != TSDB_CODE_SUCCESS) { } else if (tsdbGetState(pVnode->tsdb) != TSDB_STATE_OK) {
vError("vgId:%d, failed to open tsdb, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, vError("vgId:%d, failed to open tsdb, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno)); tstrerror(terrno));
if (pVnode->syncCfg.replica <= 1) { if (pVnode->syncCfg.replica <= 1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册