diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h index 277aa0eb9bef2c6d5a78855acf98259d0b720fc7..928ddb353e6914eb43527ad906e997ad02aee7fb 100644 --- a/src/tsdb/inc/tsdbCommit.h +++ b/src/tsdb/inc/tsdbCommit.h @@ -19,7 +19,14 @@ #ifdef __cplusplus extern "C" { #endif +typedef struct { + uint64_t uid; + int64_t offset; + int64_t size; +} SKVRecord; +int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); +void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbCommitData(STsdbRepo *pRepo); #ifdef __cplusplus diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index 9b7ad04b7cc06fb6e8b2a6b8ba866df6103ce44f..fb07f4695d480a6a8547b6d4a5ba6bf516dc3c32 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -68,16 +68,15 @@ typedef struct { #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC -#if 0 -int tsdbOpenFS(STsdbRepo* pRepo); -void tsdbCloseFS(STsdbRepo* pRepo); -int tsdbFSNewTxn(STsdbRepo* pRepo); -int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); -int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); -int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); -int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter); -SDFileSet* tsdbFSIterNext(SFSIter* pIter); -#endif +STsdbFS *tsdbNewFS(int keep, int days); +void * tsdbFreeFS(STsdbFS *pfs); +int tdbOpenFS(STsdbFS *pFs, int keep, int days); +void tsdbCloseFS(STsdbFS *pFs); +int tsdbStartTxn(STsdbFS *pfs); +int tsdbEndTxn(STsdbFS *pfs); +int tsdbEndTxnWithError(STsdbFS *pfs); +void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); +int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 7901c4fb8e2067fe16b03ef77f59f38397326107..5e6c715aeaef89ecfb2918eafbdd00b1164bce81 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -85,56 +85,151 @@ _err: // =================== Commit Meta Data static int tsdbCommitMeta(STsdbRepo *pRepo) { + STsdbFS * pfs = REPO_FS(pRepo); SMemTable *pMem = pRepo->imem; - STsdbMeta *pMeta = pRepo->tsdbMeta; + SMFile * pOMFile = pfs->cstatus->pmf; + SMFile mf; SActObj * pAct = NULL; SActCont * pCont = NULL; + SListNode *pNode = NULL; - if (listNEles(pMem->actList) <= 0) return 0; + ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); - if (tdKVStoreStartCommit(pMeta->pStore) < 0) { - tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } + if (listNEles(pMem->actList) <= 0) { + // no + tsdbUpdateMFile(pfs, pOMFile); + return 0; + } else { + // Create/Open a meta file or open the existing file + if (pOMFile == NULL) { + // Create a new meta file + tsdbInitMFile(&mf, {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}, REPO_ID(pRepo), pfs->nstatus->meta.version); - SListNode *pNode = NULL; + if (tsdbCreateMFile(&mf) < 0) { + return -1; + } + } else { + tsdbInitMFile(&mf, pOMFile); + if (tsdbOpenMFile(&mf, O_WRONLY) < 0) { + return -1; + } + } + } + // Loop to write while ((pNode = tdListPopHead(pMem->actList)) != NULL) { pAct = (SActObj *)pNode->data; if (pAct->act == TSDB_UPDATE_META) { pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); - if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { - tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tdKVStoreEndCommit(pMeta->pStore); - goto _err; + if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { + tsdbCloseMFile(&mf); + return -1; } } else if (pAct->act == TSDB_DROP_META) { - if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { - tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tdKVStoreEndCommit(pMeta->pStore); - goto _err; + if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) { + tsdbCloseMFile(&mf); + return -1; } } else { ASSERT(false); } } - if (tdKVStoreEndCommit(pMeta->pStore) < 0) { - tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; + if (tsdbUpdateMFileHeader(&mf) < 0) { + return -1; } - // TODO: update meta file - tsdbUpdateMFile(pRepo, &(pMeta->pStore.f)); + tsdbCloseMFile(&mf); + tsdbUpdateMFile(pfs, &mf); return 0; +} -_err: - return -1; +int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) { + int tlen = 0; + tlen += taosEncodeFixedU64(buf, pRecord->uid); + tlen += taosEncodeFixedI64(buf, pRecord->offset); + tlen += taosEncodeFixedI64(buf, pRecord->size); + + return tlen; +} + +void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { + buf = taosDecodeFixedU64(buf, &(pRecord->uid)); + buf = taosDecodeFixedI64(buf, &(pRecord->offset)); + buf = taosDecodeFixedI64(buf, &(pRecord->size)); + + return buf; +} + +static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) { + char buf[64] = "\0"; + void * pBuf = buf; + SKVRecord rInfo; + int64_t offset; + + // Seek to end of meta file + offset = tsdbSeekMFile(pMFile, 0, SEEK_END); + if (offset < 0) { + return -1; + } + + rInfo.offset = offset; + rInfo.uid = uid; + rInfo.size = contLen; + + tlen = tsdbEncodeKVRecord((void **)(&pBuf), pRInfo); + if (tsdbAppendMFile(pMFile, buf, tlen) < tlen) { + return -1; + } + + if (tsdbAppendMFile(pMFile, cont, contLen) < contLen) { + return -1; + } + + tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); + SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid)); + if (pRecord != NULL) { + pMFile->info.tombSize += pRecord->size; + } else { + pMFile->info.nRecords++; + } + taosHashPut(pfs->metaCache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); + + return 0; } +static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { + SKVRecord rInfo = {0}; + char buf[128] = "\0"; + + SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); + if (pRecord == NULL) { + tsdbError("failed to drop KV store record with key %" PRIu64 " since not find", uid); + return -1; + } + + rInfo.offset = -pRecord->offset; + rInfo.uid = pRecord->uid; + rInfo.size = pRecord->size; + + void *pBuf = buf; + tdEncodeKVRecord(&pBuf, &rInfo); + + if (tsdbAppendMFile(pMFile, buf, POINTER_DISTANCE(pBuf, buf), NULL) < 0) { + return -1; + } + + pMFile->meta.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf)); + pMFile->meta.nDels++; + pMFile->meta.nRecords--; + pMFile->meta.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + + taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); + return 0; +} + + // =================== Commit Time-Series Data static int tsdbCommitTSData(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; diff --git a/src/tsdb/src/tsdbStore.c b/src/tsdb/src/tsdbStore.c index 2fb1e06221a3407e128935f25ea300450ef5df2c..8c21c2e9dbd56a94881d1c228c646056a457a2cd 100644 --- a/src/tsdb/src/tsdbStore.c +++ b/src/tsdb/src/tsdbStore.c @@ -18,12 +18,6 @@ #include "tsdbint.h" -typedef struct { - uint64_t uid; - int64_t offset; - int64_t size; -} SKVRecord; - static int tdInitKVStoreHeader(int fd, char *fname); static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo); static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);