未验证 提交 184f3976 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #6252 from taosdata/fix/TD-4324

Fix/td 4324
...@@ -33,6 +33,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); ...@@ -33,6 +33,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtn(STsdbRepo *pRepo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) { if (fid >= pRtn->maxFid) {
......
...@@ -86,6 +86,7 @@ struct STsdbRepo { ...@@ -86,6 +86,7 @@ struct STsdbRepo {
SMemTable* mem; SMemTable* mem;
SMemTable* imem; SMemTable* imem;
STsdbFS* fs; STsdbFS* fs;
SRtn rtn;
tsem_t readyToCommit; tsem_t readyToCommit;
pthread_mutex_t mutex; pthread_mutex_t mutex;
bool repoLocked; bool repoLocked;
......
...@@ -86,7 +86,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); ...@@ -86,7 +86,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update); TSKEY maxKey, int maxRows, int8_t update);
static int tsdbApplyRtn(STsdbRepo *pRepo);
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
void *tsdbCommitData(STsdbRepo *pRepo) { void *tsdbCommitData(STsdbRepo *pRepo) {
...@@ -1431,7 +1430,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p ...@@ -1431,7 +1430,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
return false; return false;
} }
static int tsdbApplyRtn(STsdbRepo *pRepo) { int tsdbApplyRtn(STsdbRepo *pRepo) {
SRtn rtn; SRtn rtn;
SFSIter fsiter; SFSIter fsiter;
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
......
...@@ -33,7 +33,9 @@ static int tsdbScanDataDir(STsdbRepo *pRepo); ...@@ -33,7 +33,9 @@ static int tsdbScanDataDir(STsdbRepo *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static int tsdbRestoreCurrent(STsdbRepo *pRepo); static int tsdbRestoreCurrent(STsdbRepo *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2); static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo); static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired);
static int tsdbProcessExpiredFS(STsdbRepo *pRepo);
static int tsdbCreateMeta(STsdbRepo *pRepo);
// ================== CURRENT file header info // ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
...@@ -212,6 +214,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg) { ...@@ -212,6 +214,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg) {
return NULL; return NULL;
} }
pfs->intxn = false;
pfs->nstatus = tsdbNewFSStatus(maxFSet); pfs->nstatus = tsdbNewFSStatus(maxFSet);
if (pfs->nstatus == NULL) { if (pfs->nstatus == NULL) {
tsdbFreeFS(pfs); tsdbFreeFS(pfs);
...@@ -234,22 +238,84 @@ void *tsdbFreeFS(STsdbFS *pfs) { ...@@ -234,22 +238,84 @@ void *tsdbFreeFS(STsdbFS *pfs) {
return NULL; return NULL;
} }
static int tsdbProcessExpiredFS(STsdbRepo *pRepo) {
tsdbStartFSTxn(pRepo, 0, 0);
if (tsdbCreateMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (tsdbApplyRtn(pRepo) < 0) {
tsdbEndFSTxnWithError(REPO_FS(pRepo));
tsdbError("vgId:%d failed to apply rtn since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (tsdbEndFSTxn(pRepo) < 0) {
tsdbError("vgId:%d failed to end fs txn since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
return 0;
}
static int tsdbCreateMeta(STsdbRepo *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
SMFile * pOMFile = pfs->cstatus->pmf;
SMFile mf;
SDiskID did;
if (pOMFile != NULL) {
// keep the old meta file
tsdbUpdateMFile(pfs, pOMFile);
return 0;
}
// Create a new meta file
did.level = TFS_PRIMARY_LEVEL;
did.id = TFS_PRIMARY_ID;
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateMFile(&mf, true) < 0) {
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
tsdbInfo("vgId:%d meta file %s is created", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf));
if (tsdbUpdateMFileHeader(&mf) < 0) {
tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno));
tsdbApplyMFileChange(&mf, pOMFile);
return -1;
}
TSDB_FILE_FSYNC(&mf);
tsdbCloseMFile(&mf);
tsdbUpdateMFile(pfs, &mf);
return 0;
}
int tsdbOpenFS(STsdbRepo *pRepo) { int tsdbOpenFS(STsdbRepo *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
char current[TSDB_FILENAME_LEN] = "\0"; char current[TSDB_FILENAME_LEN] = "\0";
int nExpired = 0;
ASSERT(pfs != NULL); ASSERT(pfs != NULL);
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
tsdbGetRtnSnap(pRepo, &pRepo->rtn);
if (access(current, F_OK) == 0) { if (access(current, F_OK) == 0) {
if (tsdbOpenFSFromCurrent(pRepo) < 0) { if (tsdbOpenFSFromCurrent(pRepo) < 0) {
tsdbError("vgId:%d failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
tsdbScanAndTryFixDFilesHeader(pRepo); tsdbScanAndTryFixDFilesHeader(pRepo, &nExpired);
if (nExpired > 0) {
tsdbProcessExpiredFS(pRepo);
}
} else { } else {
// should skip expired fileset inside of the function
if (tsdbRestoreCurrent(pRepo) < 0) { if (tsdbRestoreCurrent(pRepo) < 0) {
tsdbError("vgId:%d failed to restore current file since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to restore current file since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
...@@ -1110,6 +1176,11 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { ...@@ -1110,6 +1176,11 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
ASSERT(tvid == REPO_ID(pRepo)); ASSERT(tvid == REPO_ID(pRepo));
if (tfid < pRepo->rtn.minFid) { // skip file expired
++index;
continue;
}
if (ftype == 0) { if (ftype == 0) {
fset.fid = tfid; fset.fid = tfid;
} else { } else {
...@@ -1206,7 +1277,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) { ...@@ -1206,7 +1277,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
} }
} }
static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) { static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS * pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus; SFSStatus *pStatus = pfs->cstatus;
SDFInfo info; SDFInfo info;
...@@ -1214,7 +1285,9 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) { ...@@ -1214,7 +1285,9 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) {
for (size_t i = 0; i < taosArrayGetSize(pStatus->df); i++) { for (size_t i = 0; i < taosArrayGetSize(pStatus->df); i++) {
SDFileSet fset; SDFileSet fset;
tsdbInitDFileSetEx(&fset, (SDFileSet *)taosArrayGet(pStatus->df, i)); tsdbInitDFileSetEx(&fset, (SDFileSet *)taosArrayGet(pStatus->df, i));
if (fset.fid < pRepo->rtn.minFid) {
++*nExpired;
}
tsdbDebug("vgId:%d scan DFileSet %d header", REPO_ID(pRepo), fset.fid); tsdbDebug("vgId:%d scan DFileSet %d header", REPO_ID(pRepo), fset.fid);
if (tsdbOpenDFileSet(&fset, O_RDWR) < 0) { if (tsdbOpenDFileSet(&fset, O_RDWR) < 0) {
......
...@@ -424,24 +424,42 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { ...@@ -424,24 +424,42 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
} }
if (tsdbSendDecision(pSynch, false) < 0) { if (tsdbSendDecision(pSynch, false) < 0) {
tsdbError("vgId:%d, filed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
} else { } else {
// Need to copy from remote // Need to copy from remote
int fidLevel = tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn));
if (fidLevel < 0) { // expired fileset
tsdbInfo("vgId:%d, fileset:%d will be skipped as expired", REPO_ID(pRepo), pSynch->pdf->fid);
if (tsdbSendDecision(pSynch, false) < 0) {
tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
// Move forward
if (tsdbRecvDFileSetInfo(pSynch) < 0) {
tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (pLSet) {
pLSet = tsdbFSIterNext(&fsiter);
}
// Next loop
continue;
} else {
tsdbInfo("vgId:%d, fileset:%d will be received", REPO_ID(pRepo), pSynch->pdf->fid); tsdbInfo("vgId:%d, fileset:%d will be received", REPO_ID(pRepo), pSynch->pdf->fid);
// Notify remote to send there file here // Notify remote to send there file here
if (tsdbSendDecision(pSynch, true) < 0) { if (tsdbSendDecision(pSynch, true) < 0) {
tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
}
// Create local files and copy from remote // Create local files and copy from remote
SDiskID did; SDiskID did;
SDFileSet fset; SDFileSet fset;
tfsAllocDisk(tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn)), &(did.level), &(did.id)); tfsAllocDisk(fidLevel, &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) { if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d, failed allc disk since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d, failed allc disk since %s", REPO_ID(pRepo), tstrerror(terrno));
...@@ -548,6 +566,13 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { ...@@ -548,6 +566,13 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
STsdbRepo *pRepo = pSynch->pRepo; STsdbRepo *pRepo = pSynch->pRepo;
bool toSend = false; bool toSend = false;
// skip expired fileset
if (pSet && tsdbGetFidLevel(pSet->fid, &(pSynch->rtn)) < 0) {
tsdbInfo("vgId:%d, don't sync send since fileset:%d smaller than minFid:%d", REPO_ID(pRepo), pSet->fid,
pSynch->rtn.minFid);
return 0;
}
if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) { if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) {
tsdbError("vgId:%d, failed to send fileset:%d info since %s", REPO_ID(pRepo), pSet ? pSet->fid : -1, tstrerror(terrno)); tsdbError("vgId:%d, failed to send fileset:%d info since %s", REPO_ID(pRepo), pSet ? pSet->fid : -1, tstrerror(terrno));
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册