提交 5037b495 编写于 作者: H Hongze Cheng

reload data from file

上级 c081aeeb
......@@ -330,8 +330,8 @@ void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
// For TSDB file sync
int tsdbSyncSend(STsdbRepo *pRepo, int socketFd);
int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd);
int tsdbSyncSend(void *pRepo, int socketFd);
int tsdbSyncRecv(void *pRepo, int socketFd);
#ifdef __cplusplus
}
......
......@@ -83,6 +83,7 @@ int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta);
static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
......
......@@ -94,6 +94,7 @@ int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo);
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
ASSERT(pRepo != NULL);
......
......@@ -26,7 +26,6 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo);
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo);
static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta);
// ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
......@@ -690,7 +689,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
return 0;
}
static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
char tbuf[128];
STsdbFS * pfs = REPO_FS(pRepo);
SMFile mf;
......@@ -700,6 +699,8 @@ static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
int64_t maxBufSize = 0;
SMFInfo minfo;
// TODO: clear meta at first
// No meta file, just return
if (pfs->cstatus->pmf == NULL) return 0;
......
......@@ -28,7 +28,6 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
static void tsdbFreeRepo(STsdbRepo *pRepo);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
static int tsdbRestoreInfo(STsdbRepo *pRepo);
// Function declaration
int32_t tsdbCreateRepo(int repoid) {
......@@ -539,7 +538,7 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
}
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
int tsdbRestoreInfo(STsdbRepo *pRepo) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
......
......@@ -24,6 +24,7 @@ typedef struct {
SRtn rtn;
int32_t socketFd;
void * pBuf;
bool mfChanged;
SMFile * pmf;
SMFile mf;
SDFileSet df;
......@@ -46,9 +47,11 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2);
static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet);
static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet);
static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch);
static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged);
int32_t tsdbSyncSend(STsdbRepo *pRepo, int32_t socketFd) {
SSyncH synch = {0};
int32_t tsdbSyncSend(void *tsdb, int32_t socketFd) {
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
SSyncH synch = {0};
tsdbInitSyncH(&synch, pRepo, socketFd);
// Disable TSDB commit
......@@ -75,7 +78,8 @@ _err:
return -1;
}
int32_t tsdbSyncRecv(STsdbRepo *pRepo, int32_t socketFd) {
int32_t tsdbSyncRecv(void *tsdb, int32_t socketFd) {
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
SSyncH synch;
tsdbInitSyncH(&synch, pRepo, socketFd);
......@@ -91,10 +95,12 @@ int32_t tsdbSyncRecv(STsdbRepo *pRepo, int32_t socketFd) {
goto _err;
}
// TODO: need to restart TSDB or reload TSDB here
tsdbEndFSTxn(pRepo);
tsdbDestroySyncH(&synch);
// Reload file change
tsdbReload(pRepo, synch.mfChanged);
return 0;
_err:
......@@ -179,6 +185,8 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) {
// Local has no meta file or has a different meta file, need to copy from remote
pSynch->mfChanged = true;
if (tsdbSendDecision(pSynch, true) < 0) {
tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
......@@ -643,5 +651,27 @@ static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) {
pSynch->pdf = &(pSynch->df);
tsdbDecodeDFileSetEx(SYNC_BUFFER(pSynch), pSynch->pdf);
return 0;
}
static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged) {
if (isMfChanged) {
tsdbCloseMeta(pRepo);
tsdbFreeMeta(pRepo->tsdbMeta);
pRepo->tsdbMeta = tsdbNewMeta(REPO_CFG(pRepo));
tsdbOpenMeta(pRepo);
tsdbLoadMetaCache(pRepo, true);
}
tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem);
pRepo->mem = NULL;
pRepo->imem = NULL;
if (tsdbRestoreInfo(pRepo) < 0) {
tsdbError("vgId:%d failed to restore info from file since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册