diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 2719e7aeeca3ac45d9ea0774d5ca4e7a3de81303..6ab439518e5109d5686993c06719bcc792404717 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -60,6 +60,7 @@ void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); int tsdbApplyMFileChange(SMFile* from, SMFile* to); int tsdbCreateMFile(SMFile* pMFile); int tsdbUpdateMFileHeader(SMFile* pMFile); +int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); int tsdbScanAndTryFixMFile(SMFile* pMFile); static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; } diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 13568d3611c6a7753cf1f1d543e869295c83f7f7..9efb2ba36a09022bf410d7f2e0e45ce0dd13ff23 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -80,6 +80,8 @@ int tsdbUnlockRepoMeta(STsdbRepo* pRepo); void tsdbRefTable(STable* pTable); void tsdbUnRefTable(STable* pTable); void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); +int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen); +void tsdbOrgMeta(STsdbRepo* pRepo); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index c1a028133086d79e0b6c2506380cea0ced84ea5e..e6750454b3e741f97c516396736f67cd6a9cad8b 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -26,6 +26,7 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo); static int tsdbScanAndTryFixFS(STsdbRepo *pRepo); +static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta); // ================== CURRENT file header info static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { @@ -245,7 +246,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) { } } else { // TODO: current file not exists, try to recover it + } + // Load meta cache if has meta file + if (tsdbLoadMetaCache(pRepo, true) < 0) { + tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; } return 0; @@ -680,5 +686,133 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { // TODO: remove those unused files {} + return 0; +} + +static int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { + char tbuf[128]; + STsdbFS * pfs = REPO_FS(pRepo); + SMFile mf; + SMFile * pMFile = &mf; + void * pBuf = NULL; + SKVRecord rInfo; + int64_t maxBufSize = 0; + SMFInfo minfo; + + // No meta file, just return + if (pfs->cstatus->pmf == NULL) return 0; + + mf = pfs->cstatus->mf; + // Load cache first + if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { + return -1; + } + + if (tsdbLoadMFileHeader(pMFile, &minfo) < 0) { + tsdbCloseMFile(pMFile); + return -1; + } + + while (true) { + int64_t tsize = tsdbReadMFile(pMFile, tbuf, sizeof(SKVRecord)); + if (tsize == 0) break; + if (tsize < sizeof(SKVRecord)) { + tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s", REPO_ID(pRepo), sizeof(SKVRecord), + TSDB_FILE_FULL_NAME(pMFile)); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbCloseMFile(pMFile); + return -1; + } + + void *ptr = tsdbDecodeKVRecord(tbuf, &rInfo); + ASSERT(POINTER_DISTANCE(ptr, tbuf) == sizeof(SKVRecord)); + // ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true); + + if (rInfo.offset < 0) { + taosHashRemove(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid)); +#if 0 + pStore->info.size += sizeof(SKVRecord); + pStore->info.nRecords--; + pStore->info.nDels++; + pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); +#endif + } else { + ASSERT(rInfo.offset > 0 && rInfo.size > 0); + if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { + tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo), + TSDB_FILE_FULL_NAME(pMFile)); + terrno = TSDB_CODE_COM_OUT_OF_MEMORY; + tsdbCloseMFile(pMFile); + return -1; + } + + maxBufSize = MAX(maxBufSize, rInfo.size); + + if (tsdbSeekMFile(pMFile, rInfo.size, SEEK_CUR) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), + tstrerror(terrno)); + tsdbCloseMFile(pMFile); + return -1; + } + +#if 0 + pStore->info.size += (sizeof(SKVRecord) + rInfo.size); + pStore->info.nRecords++; +#endif + } + } + + if (recoverMeta) { + pBuf = malloc(maxBufSize); + if (pBuf == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbCloseMFile(pMFile); + return -1; + } + + SKVRecord *pRecord = taosHashIterate(pfs->metaCache, NULL); + while (pRecord) { + if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { + tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), + tstrerror(terrno)); + tfree(pBuf); + tsdbCloseMFile(pMFile); + return -1; + } + + int nread = tsdbReadMFile(pMFile, pBuf, pRecord->size); + if (nread < 0) { + tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), + tstrerror(terrno)); + tfree(pBuf); + tsdbCloseMFile(pMFile); + return -1; + } + + if (nread < pRecord->size) { + tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", + REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tfree(pBuf); + tsdbCloseMFile(pMFile); + return -1; + } + + if (tsdbRestoreTable(pRepo, pBuf, pRecord->size) < 0) { + tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid, + tstrerror(terrno)); + tfree(pBuf); + tsdbCloseMFile(pMFile); + return -1; + } + + pRecord = taosHashIterate(pfs->metaCache, pRecord); + } + + tsdbOrgMeta(pRepo); + } + + tsdbCloseMFile(pMFile); + tfree(pBuf); return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 18c36e35ebe5e75b0126125388149d72739bcc21..8ed93b2015483c1640f0efd374b6b56d873765a3 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -128,6 +128,23 @@ int tsdbUpdateMFileHeader(SMFile *pMFile) { return 0; } +int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) { + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + + ASSERT(TSDB_FILE_OPENED(pMFile)); + + if (tsdbSeekMFile(pMFile, 0, SEEK_SET) < 0) { + return -1; + } + + if (tsdbReadMFile(pMFile, buf, TSDB_FILE_HEAD_SIZE) < 0) { + return -1; + } + + tsdbDecodeMFInfo(buf, pInfo); + return 0; +} + int tsdbScanAndTryFixMFile(SMFile *pMFile) { struct stat mfstat; SMFile mf = *pMFile; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 2dfae9b3f0b1618170d661db3020d2b4a6a13b98..f44f61fc70f393b9ee9ce7568adbeaeabd1c8d62 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -18,8 +18,6 @@ #define DEFAULT_TAG_INDEX_COLUMN 0 static int tsdbCompareSchemaVersion(const void *key1, const void *key2); -static int tsdbRestoreTable(void *pHandle, void *cont, int contLen); -static void tsdbOrgMeta(void *pHandle); static char * getTagIndexKey(const void *pData); static STable *tsdbNewTable(); static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper); @@ -606,10 +604,8 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, } } -// ------------------ LOCAL FUNCTIONS ------------------ -static UNUSED_FUNC int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { - STsdbRepo *pRepo = (STsdbRepo *)pHandle; - STable * pTable = NULL; +int tsdbRestoreTable(STsdbRepo *pRepo, void *cont, int contLen) { + STable *pTable = NULL; if (!taosCheckChecksumWhole((uint8_t *)cont, contLen)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; @@ -628,8 +624,7 @@ static UNUSED_FUNC int tsdbRestoreTable(void *pHandle, void *cont, int contLen) return 0; } -static UNUSED_FUNC void tsdbOrgMeta(void *pHandle) { - STsdbRepo *pRepo = (STsdbRepo *)pHandle; +void tsdbOrgMeta(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; for (int i = 1; i < pMeta->maxTables; i++) { @@ -640,6 +635,7 @@ static UNUSED_FUNC void tsdbOrgMeta(void *pHandle) { } } +// ------------------ LOCAL FUNCTIONS ------------------ static char *getTagIndexKey(const void *pData) { STable *pTable = (STable *)pData;