提交 01b90fcb 编写于 作者: H Hongze Cheng

more progress

上级 e687b4c7
......@@ -60,6 +60,7 @@ void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbCreateMFile(SMFile* pMFile);
int tsdbUpdateMFileHeader(SMFile* pMFile);
int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo);
int tsdbScanAndTryFixMFile(SMFile* pMFile);
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; }
......
......@@ -80,6 +80,8 @@ int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
void tsdbRefTable(STable* pTable);
void tsdbUnRefTable(STable* pTable);
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen);
void tsdbOrgMeta(STsdbRepo* pRepo);
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
......
......@@ -26,6 +26,7 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo);
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo);
static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta);
// ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
......@@ -245,7 +246,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) {
}
} else {
// TODO: current file not exists, try to recover it
}
// Load meta cache if has meta file
if (tsdbLoadMetaCache(pRepo, true) < 0) {
tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
return 0;
......@@ -680,5 +686,133 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
// TODO: remove those unused files
{}
return 0;
}
static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
char tbuf[128];
STsdbFS * pfs = REPO_FS(pRepo);
SMFile mf;
SMFile * pMFile = &mf;
void * pBuf = NULL;
SKVRecord rInfo;
int64_t maxBufSize = 0;
SMFInfo minfo;
// No meta file, just return
if (pfs->cstatus->pmf == NULL) return 0;
mf = pfs->cstatus->mf;
// Load cache first
if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) {
return -1;
}
if (tsdbLoadMFileHeader(pMFile, &minfo) < 0) {
tsdbCloseMFile(pMFile);
return -1;
}
while (true) {
int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord));
if (tsize == 0) break;
if (tsize < sizeof(SKVRecord)) {
tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord),
TSDB_FILE_FULL_NAME(pMFile));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbCloseMFile(pMFile);
return -1;
}
void *ptr = tsdbDecodeKVRecord(tbuf, &rInfo);
ASSERT(POINTER_DISTANCE(ptr, tbuf) == sizeof(SKVRecord));
// ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true);
if (rInfo.offset < 0) {
taosHashRemove(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid));
#if 0
pStore->info.size += sizeof(SKVRecord);
pStore->info.nRecords--;
pStore->info.nDels++;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
#endif
} else {
ASSERT(rInfo.offset > 0 && rInfo.size > 0);
if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pMFile));
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
tsdbCloseMFile(pMFile);
return -1;
}
maxBufSize = MAX(maxBufSize, rInfo.size);
if (tsdbSeekMFile(pMFile, rInfo.size, SEEK_CUR) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
tstrerror(terrno));
tsdbCloseMFile(pMFile);
return -1;
}
#if 0
pStore->info.size += (sizeof(SKVRecord) + rInfo.size);
pStore->info.nRecords++;
#endif
}
}
if (recoverMeta) {
pBuf = malloc(maxBufSize);
if (pBuf == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbCloseMFile(pMFile);
return -1;
}
SKVRecord *pRecord = taosHashIterate(pfs->metaCache, NULL);
while (pRecord) {
if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) {
tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
tstrerror(terrno));
tfree(pBuf);
tsdbCloseMFile(pMFile);
return -1;
}
int nread = tsdbReadMFile(pMFile, pBuf, pRecord->size);
if (nread < 0) {
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
tstrerror(terrno));
tfree(pBuf);
tsdbCloseMFile(pMFile);
return -1;
}
if (nread < pRecord->size) {
tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d",
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tfree(pBuf);
tsdbCloseMFile(pMFile);
return -1;
}
if (tsdbRestoreTable(pRepo, pBuf, pRecord->size) < 0) {
tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid,
tstrerror(terrno));
tfree(pBuf);
tsdbCloseMFile(pMFile);
return -1;
}
pRecord = taosHashIterate(pfs->metaCache, pRecord);
}
tsdbOrgMeta(pRepo);
}
tsdbCloseMFile(pMFile);
tfree(pBuf);
return 0;
}
\ No newline at end of file
......@@ -128,6 +128,23 @@ int tsdbUpdateMFileHeader(SMFile *pMFile) {
return 0;
}
int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
ASSERT(TSDB_FILE_OPENED(pMFile));
if (tsdbSeekMFile(pMFile, 0, SEEK_SET) < 0) {
return -1;
}
if (tsdbReadMFile(pMFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
return -1;
}
tsdbDecodeMFInfo(buf, pInfo);
return 0;
}
int tsdbScanAndTryFixMFile(SMFile *pMFile) {
struct stat mfstat;
SMFile mf = *pMFile;
......
......@@ -18,8 +18,6 @@
#define DEFAULT_TAG_INDEX_COLUMN 0
static int tsdbCompareSchemaVersion(const void *key1, const void *key2);
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen);
static void tsdbOrgMeta(void *pHandle);
static char * getTagIndexKey(const void *pData);
static STable *tsdbNewTable();
static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper);
......@@ -606,10 +604,8 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
}
}
// ------------------ LOCAL FUNCTIONS ------------------
static UNUSED_FUNC int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
STable * pTable = NULL;
int tsdbRestoreTable(STsdbRepo *pRepo, void *cont, int contLen) {
STable *pTable = NULL;
if (!taosCheckChecksumWhole((uint8_t *)cont, contLen)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
......@@ -628,8 +624,7 @@ static UNUSED_FUNC int tsdbRestoreTable(void *pHandle, void *cont, int contLen)
return 0;
}
static UNUSED_FUNC void tsdbOrgMeta(void *pHandle) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
void tsdbOrgMeta(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 1; i < pMeta->maxTables; i++) {
......@@ -640,6 +635,7 @@ static UNUSED_FUNC void tsdbOrgMeta(void *pHandle) {
}
}
// ------------------ LOCAL FUNCTIONS ------------------
static char *getTagIndexKey(const void *pData) {
STable *pTable = (STable *)pData;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册