From 18807c14de0d04e1cc3819b4b79d5df4cd6ccce5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Jan 2021 23:03:57 +0800 Subject: [PATCH] correct a lot of compile issue --- src/tsdb/inc/tsdbFS.h | 4 + src/tsdb/inc/tsdbFile.h | 9 +- src/tsdb/inc/tsdbKV.h | 52 ---- src/tsdb/inc/tsdbMeta.h | 3 +- src/tsdb/inc/tsdbReadImpl.h | 5 +- src/tsdb/inc/tsdbint.h | 4 +- src/tsdb/src/tsdbCommit.c | 130 ++++++--- src/tsdb/src/tsdbCommitQueue.c | 6 +- src/tsdb/src/tsdbFile.c | 32 ++- src/tsdb/src/tsdbMain.c | 11 +- src/tsdb/src/tsdbMemTable.c | 3 +- src/tsdb/src/tsdbMeta.c | 24 +- src/tsdb/src/tsdbRead.c | 76 ++--- src/tsdb/src/tsdbReadImpl.c | 35 ++- src/tsdb/src/tsdbScan.c | 4 +- src/tsdb/src/tsdbStore.c | 496 --------------------------------- 16 files changed, 195 insertions(+), 699 deletions(-) delete mode 100644 src/tsdb/inc/tsdbKV.h delete mode 100644 src/tsdb/src/tsdbStore.c diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h index fb07f4695d..35f559e6a4 100644 --- a/src/tsdb/inc/tsdbFS.h +++ b/src/tsdb/inc/tsdbFS.h @@ -78,6 +78,10 @@ int tsdbEndTxnWithError(STsdbFS *pfs); void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); +void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); +void tsdbFSIterSeek(SFSIter *pIter, int fid); +SDFileSet *tsdbFSIterNext(SFSIter *pIter); + static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); if (code != 0) { diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 182fc9d443..c122b83b82 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -59,11 +59,12 @@ typedef struct { int fd; } SMFile; -void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, int ver); +void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver); void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile); int tsdbEncodeSMFile(void** buf, SMFile* pMFile); void* tsdbDecodeSMFile(void* buf, SMFile* pMFile); -int tsdbApplyMFileChange(const SMFile* from, const SMFile* to); +int tsdbApplyMFileChange(SMFile* from, SMFile* to); +int tsdbApplyMFileChange(SMFile* from, SMFile* to); static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) { ASSERT(!TSDB_FILE_OPENED(pMFile)); @@ -287,11 +288,11 @@ typedef struct { #define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) #define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) -void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, int ver); +void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver); void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet); -int tsdbApplyDFileSetChange(const SDFileSet* from, const SDFileSet* to); +int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { diff --git a/src/tsdb/inc/tsdbKV.h b/src/tsdb/inc/tsdbKV.h deleted file mode 100644 index 7bd270bed2..0000000000 --- a/src/tsdb/inc/tsdbKV.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_TSDB_KV_H_ -#define _TD_TSDB_KV_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#define KVSTORE_FILE_VERSION ((uint32_t)0) - -typedef int (*iterFunc)(void*, void* cont, int contLen); -typedef void (*afterFunc)(void*); - -typedef struct { - SMFile f; - SHashObj* map; - iterFunc iFunc; - afterFunc aFunc; - void* appH; -} SKVStore; - -#define KVSTORE_MAGIC(s) (s)->f.info.magic - -int tdCreateKVStore(char* fname); -int tdDestroyKVStore(char* fname); -SKVStore* tdOpenKVStore(char* fname, iterFunc iFunc, afterFunc aFunc, void* appH); -void tdCloseKVStore(SKVStore* pStore); -int tdKVStoreStartCommit(SKVStore* pStore); -int tdUpdateKVStoreRecord(SKVStore* pStore, uint64_t uid, void* cont, int contLen); -int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid); -int tdKVStoreEndCommit(SKVStore* pStore); -void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_TSDB_KV_H_*/ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index c9e37c73f2..13568d3611 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -51,7 +51,6 @@ typedef struct { STable** tables; SList* superList; SHashObj* uidMap; - SKVStore* pStore; int maxRowBytes; int maxCols; } STsdbMeta; @@ -63,7 +62,7 @@ typedef struct { #define TABLE_UID(t) (t)->tableId.uid #define TABLE_TID(t) (t)->tableId.tid #define TABLE_SUID(t) (t)->suid -#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) +// #define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) #define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch)) #define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch)) #define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch)) diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 63fdb1864a..0801d7a226 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -109,9 +109,8 @@ void tsdbCloseAndUnsetFSet(SReadH *pReadh); int tsdbLoadBlockIdx(SReadH *pReadh); int tsdbSetReadTable(SReadH *pReadh, STable *pTable); int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); -int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo); -int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds, - int numOfColsIds); +int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 80360e89e1..80cf4cc515 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include "os.h" #include "tlog.h" @@ -98,7 +100,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); char* tsdbGetDataDirName(char* rootDir); int tsdbGetNextMaxTables(int tid); STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); -STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); +// STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo); static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 5e6c715aea..2f034873a4 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -54,6 +54,40 @@ typedef struct { #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5) +static int tsdbCommitMeta(STsdbRepo *pRepo); +static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen); +static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); +static int tsdbCommitTSData(STsdbRepo *pRepo); +static int tsdbStartCommit(STsdbRepo *pRepo); +static void tsdbEndCommit(STsdbRepo *pRepo, int eno); +static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +static int tsdbCreateCommitIters(SCommitH *pCommith); +static void tsdbDestroyCommitIters(SCommitH *pCommith); +static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); +static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); +static void tsdbDestroyCommitH(SCommitH *pCommith); +static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); +static int tsdbGetFidLevel(int fid, SRtn *pRtn); +static int tsdbNextCommitFid(SCommitH *pCommith); +static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); +static int tsdbComparKeyBlock(const void *arg1, const void *arg2); +static int tsdbWriteBlockInfo(SCommitH *pCommih); +static int tsdbWriteBlockIdx(SCommitH *pCommih); +static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); +static int tsdbMoveBlock(SCommitH *pCommith, int bidx); +static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock); +static void tsdbResetCommitFile(SCommitH *pCommith); +static void tsdbResetCommitTable(SCommitH *pCommith); +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); +static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows, int8_t update); + void *tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pRepo) < 0) { tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -92,6 +126,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { SActObj * pAct = NULL; SActCont * pCont = NULL; SListNode *pNode = NULL; + SDiskID did; ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); @@ -103,13 +138,15 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { // Create/Open a meta file or open the existing file if (pOMFile == NULL) { // Create a new meta file - tsdbInitMFile(&mf, {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}, REPO_ID(pRepo), pfs->nstatus->meta.version); + did.level = TFS_PRIMARY_LEVEL; + did.id = TFS_PRIMARY_ID; + tsdbInitMFile(&mf, did, REPO_ID(pRepo), pfs->nstatus->meta.version); if (tsdbCreateMFile(&mf) < 0) { return -1; } } else { - tsdbInitMFile(&mf, pOMFile); + tsdbInitMFileEx(&mf, pOMFile); if (tsdbOpenMFile(&mf, O_WRONLY) < 0) { return -1; } @@ -178,12 +215,12 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void rInfo.uid = uid; rInfo.size = contLen; - tlen = tsdbEncodeKVRecord((void **)(&pBuf), pRInfo); - if (tsdbAppendMFile(pMFile, buf, tlen) < tlen) { + int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo); + if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) { return -1; } - if (tsdbAppendMFile(pMFile, cont, contLen) < contLen) { + if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) { return -1; } @@ -214,16 +251,16 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { rInfo.size = pRecord->size; void *pBuf = buf; - tdEncodeKVRecord(&pBuf, &rInfo); + tsdbEncodeKVRecord(&pBuf, &rInfo); if (tsdbAppendMFile(pMFile, buf, POINTER_DISTANCE(pBuf, buf), NULL) < 0) { return -1; } - pMFile->meta.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf)); - pMFile->meta.nDels++; - pMFile->meta.nRecords--; - pMFile->meta.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf)); + pMFile->info.nDels++; + pMFile->info.nRecords--; + pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); return 0; @@ -233,8 +270,8 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { // =================== Commit Time-Series Data static int tsdbCommitTSData(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; - STsdbCfg * pCfg = REPO_CFG(pRepo); SCommitH commith = {0}; + STsdbFS * pfs = REPO_FS(pRepo); SDFileSet *pSet = NULL; SDFileSet nSet; int fid; @@ -273,17 +310,17 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { if (level > TSDB_FSET_LEVEL(pSet)) { // Need to move the FSET to higher level - if (tsdbCopyDFileSet(*pSet, level, id, &nSet) < 0) { + if (tsdbCopyDFileSet(pSet, &nSet) < 0) { tsdbDestroyCommitH(&commith); return -1; } - if (tsdbUpdateDFileSet(pRepo, &nSet) < 0) { + if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { tsdbDestroyCommitH(&commith); return -1; } } else { - if (tsdbUpdateDFileSet(pRepo, pSet) < 0) { + if (tsdbUpdateDFileSet(pfs, pSet) < 0) { tsdbDestroyCommitH(&commith); return -1; } @@ -307,7 +344,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { } fid = tsdbNextCommitFid(&commith); - if (tsdbCommitToFile(pCSet, &commith, cfid) < 0) { + if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) { tsdbDestroyCommitH(&commith); return -1; } @@ -324,15 +361,17 @@ static int tsdbStartCommit(STsdbRepo *pRepo) { tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); - if (tsdbFSNewTxn(pRepo) < 0) return -1; + if (tsdbStartTxn(REPO_FS(pRepo)) < 0) return -1; pRepo->code = TSDB_CODE_SUCCESS; return 0; } static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { - if (tsdbFSEndTxn(pRepo, eno != TSDB_CODE_SUCCESS) < 0) { - eno = terrno; + if (eno != TSDB_CODE_SUCCESS) { + tsdbEndTxnWithError(REPO_FS(pRepo)); + } else { + tsdbEndTxn(REPO_FS(pRepo)); } tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); @@ -348,6 +387,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { sem_post(&(pRepo->readyToCommit)); } +#if 0 static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { for (int i = 0; i < nIters; i++) { TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); @@ -355,6 +395,7 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS } return false; } +#endif static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); @@ -383,9 +424,14 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { } } + if (tsdbWriteBlockIdx(pCommith) < 0) { + tsdbCloseCommitFile(pCommith, true); + return -1; + } + tsdbCloseCommitFile(pCommith, false); - if (tsdbUpdateDFileSet(pRepo, &(pCommith->wSet)) < 0) { + if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { // TODO return -1; } @@ -393,7 +439,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { return 0; } -static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) { +static int tsdbCreateCommitIters(SCommitH *pCommith) { STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -405,7 +451,7 @@ static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) { return -1; } - if (tsdbRLockRepoMeta(pRepo) < 0) return -1 + if (tsdbRLockRepoMeta(pRepo) < 0) return -1; // reference all tables for (int i = 0; i < pMem->maxTables; i++) { @@ -438,7 +484,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { for (int i = 1; i < pCommith->niters; i++) { if (pCommith->iters[i].pTable != NULL) { tsdbUnRefTable(pCommith->iters[i].pTable); - tSkipListDestroyIter(iters[i].pIter); + tSkipListDestroyIter(pCommith->iters[i].pIter); } } @@ -480,10 +526,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) { } // Init file iterator - if (tsdbInitFSIter(pRepo, &(pCommith->fsIter)) < 0) { - tsdbDestroyCommitH(pCommith); - return -1; - } + tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); if (tsdbCreateCommitIters(pCommith) < 0) { tsdbDestroyCommitH(pCommith); @@ -839,9 +882,9 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo pBlockCol->colId = pDataCol->colId; pBlockCol->type = pDataCol->type; if (tDataTypeDesc[pDataCol->type].getStatisFunc) { - (*tDataTypeDesc[pDataCol->type].getStatisFunc)( - (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), - &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); + (*tDataTypeDesc[pDataCol->type].getStatisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), + &(pBlockCol->numOfNull)); } nColsNotAllNull++; } @@ -915,7 +958,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); // Write the whole block to file - if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize < lsize)) { + if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize) < lsize) { return -1; } @@ -1164,7 +1207,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith); - SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh)); + // SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh)); SBlock block; if ((pBlock->last && pCommith->isLFileSame) || ((!pBlock->last) && pCommith->isDFileSame)) { @@ -1172,7 +1215,7 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1; } else { block = *pBlock; - block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlock); + block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlk); if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), pBlock->numOfSubBlocks) < 0) { @@ -1235,8 +1278,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; } - + return 0; } static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, @@ -1323,6 +1366,7 @@ static void tsdbResetCommitTable(SCommitH *pCommith) { static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { int level, id; + SDiskID did; SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id); @@ -1350,11 +1394,13 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid // Set and open commit FSET if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) { // Create new FSET - tsdbInitDFileSet(pWSet, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, level, id); + did.level = level; + did.id = id; + tsdbInitDFileSet(pWSet, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version); if (tsdbOpenDFileSet(pWSet, O_WRONLY | O_CREAT) < 0) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - remove(TSDB_FILE_FULL_NAME(pWSet, ftype)); + remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype))); } if (pCommith->isRFileSet) { @@ -1367,7 +1413,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid tsdbCloseDFileSet(pWSet); for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - remove(TSDB_FILE_FULL_NAME(pWSet, ftype)); + remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype))); } if (pCommith->isRFileSet) { @@ -1384,8 +1430,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid // TSDB_FILE_HEAD SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); - tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD); - if (tsdbCreateAndOpenDFile(pWHeadf) < 0) { + did.level = level; + did.id = id; + tsdbInitDFile(pWHeadf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_HEAD); + if (tsdbCreateDFile(pWHeadf) < 0) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; @@ -1395,7 +1443,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid // TSDB_FILE_DATA SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); - tsdbInitDFileWithOld(pWHeadf, pRDataf); + tsdbInitDFileEx(pWHeadf, pRDataf); if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { tsdbCloseDFile(pWHeadf); remove(TSDB_FILE_FULL_NAME(pWHeadf)); @@ -1410,10 +1458,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); if (pRLastf->info.size < 32 * 1024) { - tsdbInitDFileWithOld(pWLastf, pRLastf); + tsdbInitDFileEx(pWLastf, pRLastf); pCommith->isLFileSame = true; } else { - tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST); + tsdbInitDFile(pWLastf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_LAST); pCommith->isLFileSame = false; } if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 75a2cbcb8d..f388e2cee6 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -13,11 +13,7 @@ * along with this program. If not, see . */ -#include "os.h" -#include "tglobal.h" -#include "tlist.h" -#include "tref.h" -#include "tsdbMain.h" +#include "tsdbint.h" typedef struct { bool stop; diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 53b36448c4..e03c52984b 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -24,6 +24,14 @@ static const char *TSDB_FNAME_SUFFIX[] = { "manifest" // TSDB_FILE_MANIFEST }; +static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); +static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo); +static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo); +static int tsdbRollBackMFile(SMFile *pMFile); +static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); +static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); +static int tsdbRollBackDFile(SDFile *pDFile); + // ============== SMFile void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) { char fname[TSDB_FILENAME_LEN]; @@ -67,7 +75,7 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) { } else { if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) { if (from->info.size > to->info.size) { - tsdbRollbackMFile(to); + tsdbRollBackMFile(to); } } else { tsdbRemoveMFile(from); @@ -78,7 +86,7 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) { return 0; } -static int tsdbRollBackMFile(const SMFile *pMFile) { +static int tsdbRollBackMFile(SMFile *pMFile) { SMFile mf = *pMFile; if (tsdbOpenMFile(&mf, O_WRONLY) < 0) { @@ -172,7 +180,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, pDFile->info.magic = TSDB_FILE_INIT_MAGIC; tsdbGetFilename(vid, 0, ver, ftype, fname); - tfsInitFile(&(pDFile->f), level, id, fname); + tfsInitFile(&(pDFile->f), did.level, did.id, fname); } void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) { @@ -271,7 +279,7 @@ static int tsdbApplyDFileChange(SDFile *from, SDFile *to) { } else { if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) { if (from->info.size > to->info.size) { - tsdbRollbackDFile(to); + tsdbRollBackDFile(to); } } else { tsdbRemoveDFile(from); @@ -282,7 +290,7 @@ static int tsdbApplyDFileChange(SDFile *from, SDFile *to) { return 0; } -static int tsdbRollBackDFile(const SDFile *pDFile) { +static int tsdbRollBackDFile(SDFile *pDFile) { SDFile df = *pDFile; if (tsdbOpenDFile(&df, O_WRONLY) < 0) { @@ -328,7 +336,7 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) { tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype)); } - return tlen + return tlen; } void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) { @@ -338,7 +346,7 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) { return buf; } -int tsdbApplyDFileSetChange(const SDFileSet *from, const SDFileSet *to) { +int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { if (tsdbApplyDFileChange(TSDB_DFILE_IN_SET(from, ftype), TSDB_DFILE_IN_SET(to, ftype)) < 0) { return -1; @@ -366,7 +374,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return -1; } } - return 0 + return 0; } static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) { @@ -374,15 +382,15 @@ static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, c if (ftype < TSDB_FILE_MAX) { if (ver == 0) { - snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]); + snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]); } else { - snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRIu32, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver); + snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRIu32, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver); } } else { if (ver == 0) { - snprintf(fname, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]); + snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]); } else { - snprintf(fname, "vnode/vnode%d/tsdb/%s-%012" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver); + snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s-%012" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver); } } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c6c537c17c..b986dc89f8 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -14,14 +14,7 @@ */ // no test file errors here -#include "tsdbMain.h" -#include "os.h" -#include "talgo.h" -#include "taosdef.h" -#include "tchecksum.h" -#include "tscompression.h" -#include "tsdb.h" -#include "tulog.h" +#include "tsdbint.h" #define TSDB_CFG_FILE_NAME "config" #define TSDB_DATA_DIR_NAME "data" @@ -388,7 +381,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { } STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } -STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } +// STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } // ----------------- LOCAL FUNCTIONS ----------------- diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 58c527f426..98ebd84bfa 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "tsdb.h" -#include "tsdbMain.h" +#include "tsdbint.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_MAX_INSERT_BATCH 512 diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 7b08178f49..abecc008d6 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -12,13 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include -#include "hash.h" -#include "taosdef.h" -#include "tchecksum.h" -#include "tsdb.h" -#include "tsdbMain.h" -#include "tskiplist.h" +#include "tsdbint.h" #define TSDB_SUPER_TABLE_SL_LEVEL 5 #define DEFAULT_TAG_INDEX_COLUMN 0 @@ -479,11 +473,11 @@ int tsdbOpenMeta(STsdbRepo *pRepo) { goto _err; } - pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo); - if (pMeta->pStore == NULL) { - tsdbError("vgId:%d failed to open TSDB meta while open the kv store since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } + // pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo); + // if (pMeta->pStore == NULL) { + // tsdbError("vgId:%d failed to open TSDB meta while open the kv store since %s", REPO_ID(pRepo), tstrerror(terrno)); + // goto _err; + // } tsdbDebug("vgId:%d open TSDB meta succeed", REPO_ID(pRepo)); tfree(fname); @@ -500,7 +494,7 @@ int tsdbCloseMeta(STsdbRepo *pRepo) { STable * pTable = NULL; if (pMeta == NULL) return 0; - tdCloseKVStore(pMeta->pStore); + // tdCloseKVStore(pMeta->pStore); for (int i = 1; i < pMeta->maxTables; i++) { tsdbFreeTable(pMeta->tables[i]); } @@ -610,7 +604,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, } // ------------------ LOCAL FUNCTIONS ------------------ -static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { +static UNUSED_FUNC int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { STsdbRepo *pRepo = (STsdbRepo *)pHandle; STable * pTable = NULL; @@ -631,7 +625,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { return 0; } -static void tsdbOrgMeta(void *pHandle) { +static UNUSED_FUNC void tsdbOrgMeta(void *pHandle) { STsdbRepo *pRepo = (STsdbRepo *)pHandle; STsdbMeta *pMeta = pRepo->tsdbMeta; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index e09735652a..aca3be90dc 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -22,7 +22,7 @@ #include "../../query/inc/qAst.h" // todo move to common module #include "tlosertree.h" #include "tsdb.h" -#include "tsdbMain.h" +#include "tsdbint.h" #define EXTRA_BYTES 2 #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -54,7 +54,7 @@ typedef struct SQueryFilePos { } SQueryFilePos; typedef struct SDataBlockLoadInfo { - SFileGroup* fileGroup; + SDFileSet* fileGroup; int32_t slot; int32_t tid; SArray* pLoadedCols; @@ -113,9 +113,9 @@ typedef struct STsdbQueryHandle { bool cachelastrow; // check if last row cached void* qinfo; // query info handle, for debug purpose int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows - SFileGroup* pFileGroup; - SFileGroupIter fileIter; - SRWHelper rhelper; + SDFileSet* pFileGroup; + SFSIter fileIter; + SReadH rhelper; STableBlockInfo* pDataBlockInfo; SDataCols *pDataCols; // in order to hold current file data block @@ -295,7 +295,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pQueryHandle->locateStart = false; pQueryHandle->pMemRef = pMemRef; - if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) { + if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) { goto out_of_memory; } @@ -716,12 +716,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); pCheckInfo->numOfBlocks = 0; - if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) { + if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) { code = terrno; break; } - SBlockIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; + SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx; // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { @@ -742,7 +742,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo pCheckInfo->compSize = compIndex->len; } - tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); + tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; @@ -792,14 +792,14 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc goto _error; } - code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); + code = tdInitDataCols(pQueryHandle->rhelper.pDCols[0], pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } - code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); + code = tdInitDataCols(pQueryHandle->rhelper.pDCols[1], pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -821,7 +821,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc pBlockLoadInfo->slot = pQueryHandle->cur.slot; pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper.pDCols[0]; assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); pBlock->numOfRows = pCols->numOfRows; @@ -942,7 +942,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, return code; } - SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pTSCol = pQueryHandle->rhelper.pDCols[0]; assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows); if (pCheckInfo->lastKey > pBlock->keyFirst) { @@ -965,7 +965,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, return code; } - SDataCols* pTsCol = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pTsCol = pQueryHandle->rhelper.pDCols[0]; if (pCheckInfo->lastKey < pBlock->keyLast) { cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); } else { @@ -1050,7 +1050,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity char* pData = NULL; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper.pDCols[0]; TSKEY* tsArray = pCols->cols[0].pData; int32_t num = end - start + 1; @@ -1274,7 +1274,7 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) { static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) { SQueryFilePos* cur = &pQueryHandle->cur; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper.pDCols[0]; TSKEY* tsArray = pCols->cols[0].pData; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; @@ -1317,7 +1317,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC; SQueryFilePos* cur = &pQueryHandle->cur; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper.pDCols[0]; if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey >= pBlockInfo->window.ekey) { endPos = pBlockInfo->rows - 1; @@ -1343,7 +1343,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* initTableMemIterator(pQueryHandle, pCheckInfo); - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper.pDCols[0]; assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && cur->pos >= 0 && cur->pos < pBlock->numOfRows); @@ -1751,19 +1751,19 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist STimeWindow win = TSWINDOW_INITIALIZER; while (true) { - pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + tsdbRLockFS(REPO_FS(pQueryHandle->pTsdb)); - if ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) == NULL) { - pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + if ((pQueryHandle->pFileGroup = tsdbFSIterNext(&pQueryHandle->fileIter)) == NULL) { + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); break; } - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { - pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo); pQueryHandle->pFileGroup = NULL; @@ -1771,15 +1771,15 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist break; } - if (tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) { - pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + if (tsdbSetAndOpenReadFSet(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) { + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); code = terrno; break; } - pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); - if (tsdbLoadCompIdx(&pQueryHandle->rhelper, NULL) < 0) { + if (tsdbLoadBlockIdx(&pQueryHandle->rhelper) < 0) { code = terrno; break; } @@ -1789,7 +1789,7 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist } tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables, - pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo); + pQueryHandle->pFileGroup->fid, pQueryHandle->qinfo); assert(numOfBlocks >= 0); if (numOfBlocks == 0) { @@ -1820,7 +1820,7 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist assert(pQueryHandle->pFileGroup != NULL && pQueryHandle->numOfBlocks > 0); cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; - cur->fid = pQueryHandle->pFileGroup->fileId; + cur->fid = pQueryHandle->pFileGroup->fid; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; return getDataBlockRv(pQueryHandle, pBlockInfo, exists); @@ -1843,7 +1843,7 @@ static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) { } static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) { - STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb); + STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb); SQueryFilePos* cur = &pQueryHandle->cur; // find the start data block in file @@ -1852,10 +1852,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision); - pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); - tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); - tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); - pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); + tsdbRLockFS(pFileHandle); + tsdbFSIterInit(&pQueryHandle->fileIter, pFileHandle, pQueryHandle->order); + tsdbFSIterSeek(&pQueryHandle->fileIter, fid); + tsdbUnLockFS(pFileHandle); return getFirstFileDataBlock(pQueryHandle, exists); } else { @@ -2382,7 +2382,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta } int64_t stime = taosGetTimestampUs(); - tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); + tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock); int16_t* colIds = pHandle->defaultLoadColumn->pData; @@ -2392,7 +2392,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta pHandle->statis[i].colId = colIds[i]; } - tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols); + tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols); // always load the first primary timestamp column data SDataStatis* pPrimaryColStatis = &pHandle->statis[0]; @@ -2444,7 +2444,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { // data block has been loaded, todo extract method SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo; - if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid && + if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid && pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { return pHandle->pColumns; } else { // only load the file block @@ -2923,7 +2923,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { // todo check error tsdbMayUnTakeMemSnapshot(pQueryHandle); - tsdbDestroyHelper(&pQueryHandle->rhelper); + tsdbDestroyReadH(&pQueryHandle->rhelper); tdFreeDataCols(pQueryHandle->pDataCols); pQueryHandle->pDataCols = NULL; diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 9948a9f3c5..3d0ed7549d 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -19,10 +19,10 @@ static void tsdbResetReadTable(SReadH *pReadh); static void tsdbResetReadFile(SReadH *pReadh); -static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols); +static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bufferSize); -static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, +static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); @@ -240,10 +240,10 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { return 0; } -int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo) { +int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { ASSERT(pBlock->numOfSubBlocks > 0); - const SBlock *iBlock = pBlock; + SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { if (pBlkInfo) { iBlock = (SBlock *)POINTER_SHIFT(pBlkInfo, pBlock->offset); @@ -266,11 +266,10 @@ int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pB return 0; } -int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo, const int16_t *colIds, - int numOfColsIds) { +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds) { ASSERT(pBlock->numOfSubBlocks > 0); - const SBlock *iBlock = pBlock; + SBlock *iBlock = pBlock; if (pBlock->numOfSubBlocks > 1) { if (pBlkInfo) { iBlock = POINTER_SHIFT(pBlkInfo, pBlock->offset); @@ -300,7 +299,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno)); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno)); return -1; } @@ -310,7 +309,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size); if (nread < 0) { tsdbError("vgId:%d failed to load block statis part while read file %s sinces %s, offset:%" PRId64 " len :%" PRIzu, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, size); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size); return -1; } @@ -318,14 +317,14 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu " read bytes: %" PRId64, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size, nread); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread); return -1; } if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), size)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size); return -1; } @@ -408,7 +407,7 @@ static void tsdbResetReadFile(SReadH *pReadh) { tsdbCloseDFileSet(TSDB_READ_FSET(pReadh)); } -static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols) { +static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols) { ASSERT(pBlock->numOfSubBlocks >= 0 && pBlock->numOfSubBlocks <= 1); SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_HEAD_FILE(pReadh); @@ -420,14 +419,14 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to load block data part while seek file %s to offset %" PRId64 " since %s", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno)); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno)); return -1; } int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len); if (nread < 0) { tsdbError("vgId:%d failed to load block data part while read file %s sinces %s, offset:%" PRId64 " len :%d", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, pBlock->len); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, pBlock->len); return -1; } @@ -435,7 +434,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64 " expected bytes:%d read bytes: %" PRId64, - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, pBlock->len, nread); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, pBlock->len, nread); return -1; } @@ -443,7 +442,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d", - TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tsize); + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tsize); return -1; } @@ -542,7 +541,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 return 0; } -static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, +static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { ASSERT(pBlock->numOfSubBlocks <= 1); ASSERT(colIds[0] == 0); @@ -653,7 +652,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { - tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_NAME(pDFile), + tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), pBlockCol->colId, offset); return -1; } diff --git a/src/tsdb/src/tsdbScan.c b/src/tsdb/src/tsdbScan.c index 06a8cc6944..382f7b11ae 100644 --- a/src/tsdb/src/tsdbScan.c +++ b/src/tsdb/src/tsdbScan.c @@ -13,8 +13,9 @@ * along with this program. If not, see . */ -#include "tsdbMain.h" +#include "tsdbint.h" +#if 0 #ifndef _TSDB_PLUGINS int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; } @@ -33,4 +34,5 @@ int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; } void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle) {} +#endif #endif \ No newline at end of file diff --git a/src/tsdb/src/tsdbStore.c b/src/tsdb/src/tsdbStore.c deleted file mode 100644 index 8c21c2e9db..0000000000 --- a/src/tsdb/src/tsdbStore.c +++ /dev/null @@ -1,496 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#define TAOS_RANDOM_FILE_FAIL_TEST - -#include "tsdbint.h" - -static int tdInitKVStoreHeader(int fd, char *fname); -static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo); -static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); -static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); -static void tdFreeKVStore(SKVStore *pStore); -static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); -static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version); -static int tdEncodeKVRecord(void **buf, SKVRecord *pRecord); -static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord); -static int tdRestoreKVStore(SKVStore *pStore); - -int tdCreateKVStore(char *fname) { - int fd = open(fname, O_RDWR | O_CREAT, 0755); - if (fd < 0) { - uError("failed to open file %s since %s", fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (tdInitKVStoreHeader(fd, fname) < 0) goto _err; - - if (fsync(fd) < 0) { - uError("failed to fsync file %s since %s", fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (close(fd) < 0) { - uError("failed to close file %s since %s", fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - return 0; - -_err: - if (fd >= 0) close(fd); - (void)remove(fname); - return -1; -} - -int tdDestroyKVStore(char *fname) { - if (remove(fname) < 0) { - uError("failed to remove file %s since %s", fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return 0; -} - -SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { - SStoreInfo info = {0}; - uint32_t version = 0; - - SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH); - if (pStore == NULL) return NULL; - - pStore->fd = open(pStore->fname, O_RDWR); - if (pStore->fd < 0) { - uError("failed to open file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info, &version) < 0) goto _err; - if (version != KVSTORE_FILE_VERSION) { - uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fname, version, - KVSTORE_FILE_VERSION); - } - - pStore->info.size = TD_KVSTORE_HEADER_SIZE; - pStore->info.magic = info.magic; - - if (tdRestoreKVStore(pStore) < 0) goto _err; - - close(pStore->fd); - pStore->fd = -1; - - return pStore; - -_err: - if (pStore->fd > 0) { - close(pStore->fd); - pStore->fd = -1; - } - tdFreeKVStore(pStore); - return NULL; -} - -void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); } - -int tdKVStoreStartCommit(SKVStore *pStore) { - ASSERT(pStore->fd < 0); - - pStore->fd = open(pStore->fname, O_RDWR); - if (pStore->fd < 0) { - uError("failed to open file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (lseek(pStore->fd, 0, SEEK_END) < 0) { - uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR)); - - return 0; - -_err: - if (pStore->fd > 0) { - close(pStore->fd); - pStore->fd = -1; - } - return -1; -} - -int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen) { - SKVRecord rInfo = {0}; - char buf[64] = "\0"; - char * pBuf = buf; - - rInfo.offset = lseek(pStore->fd, 0, SEEK_CUR); - if (rInfo.offset < 0) { - uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - rInfo.uid = uid; - rInfo.size = contLen; - - int tlen = tdEncodeKVRecord((void *)(&pBuf), &rInfo); - ASSERT(tlen == POINTER_DISTANCE(pBuf, buf)); - ASSERT(tlen == sizeof(SKVRecord)); - - if (taosWrite(pStore->fd, buf, tlen) < tlen) { - uError("failed to write %d bytes to file %s since %s", tlen, pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosWrite(pStore->fd, cont, contLen) < contLen) { - uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - pStore->info.magic = - taosCalcChecksum(pStore->info.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); - pStore->info.size += (sizeof(SKVRecord) + contLen); - SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid)); - if (pRecord != NULL) { // just to insert - pStore->info.tombSize += pRecord->size; - } else { - pStore->info.nRecords++; - } - - taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); - uTrace("put uid %" PRIu64 " into kvStore %s", uid, pStore->fname); - - return 0; -} - -int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { - SKVRecord rInfo = {0}; - char buf[128] = "\0"; - - SKVRecord *pRecord = taosHashGet(pStore->map, (void *)(&uid), sizeof(uid)); - if (pRecord == NULL) { - uError("failed to drop KV store record with key %" PRIu64 " since not find", uid); - return -1; - } - - rInfo.offset = -pRecord->offset; - rInfo.uid = pRecord->uid; - rInfo.size = pRecord->size; - - void *pBuf = buf; - tdEncodeKVRecord(&pBuf, &rInfo); - - if (taosWrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) { - uError("failed to write %" PRId64 " bytes to file %s since %s", (int64_t)(POINTER_DISTANCE(pBuf, buf)), pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - pStore->info.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf)); - pStore->info.size += POINTER_DISTANCE(pBuf, buf); - pStore->info.nDels++; - pStore->info.nRecords--; - pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); - - taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid)); - uDebug("drop uid %" PRIu64 " from KV store %s", uid, pStore->fname); - - return 0; -} - -int tdKVStoreEndCommit(SKVStore *pStore) { - ASSERT(pStore->fd > 0); - - if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1; - - if (fsync(pStore->fd) < 0) { - uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (close(pStore->fd) < 0) { - uError("failed to close file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pStore->fd = -1; - - return 0; -} - -void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) { - char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; - SStoreInfo info = {0}; - - int fd = open(fname, O_RDONLY); - if (fd < 0) goto _err; - - if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) goto _err; - if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) goto _err; - - void *pBuf = (void *)buf; - pBuf = tdDecodeStoreInfo(pBuf, &info); - off_t offset = lseek(fd, 0, SEEK_END); - if (offset < 0) goto _err; - close(fd); - - *magic = info.magic; - *size = offset; - - return; - -_err: - if (fd >= 0) close(fd); - *magic = TD_KVSTORE_INIT_MAGIC; - *size = 0; -} - -static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version) { - char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; - - if (lseek(fd, 0, SEEK_SET) < 0) { - uError("failed to lseek file %s since %s", fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) { - uError("file %s is broken", fname); - terrno = TSDB_CODE_COM_FILE_CORRUPTED; - return -1; - } - - void *pBuf = (void *)buf; - pBuf = tdDecodeStoreInfo(pBuf, pInfo); - pBuf = taosDecodeFixedU32(pBuf, version); - - return 0; -} - -static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { - char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; - - if (lseek(fd, 0, SEEK_SET) < 0) { - uError("failed to lseek file %s since %s", fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - void *pBuf = buf; - tdEncodeStoreInfo(&pBuf, pInfo); - taosEncodeFixedU32(&pBuf, KVSTORE_FILE_VERSION); - ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE); - - taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); - if (taosWrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return 0; -} - -static int tdInitKVStoreHeader(int fd, char *fname) { - SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0, TD_KVSTORE_INIT_MAGIC}; - - return tdUpdateKVStoreHeader(fd, fname, &info); -} - -static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) { - int tlen = 0; - tlen += taosEncodeVariantI64(buf, pInfo->size); - tlen += taosEncodeVariantI64(buf, pInfo->tombSize); - tlen += taosEncodeVariantI64(buf, pInfo->nRecords); - tlen += taosEncodeVariantI64(buf, pInfo->nDels); - tlen += taosEncodeFixedU32(buf, pInfo->magic); - - return tlen; -} - -static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { - buf = taosDecodeVariantI64(buf, &(pInfo->size)); - buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); - buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); - buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - - return buf; -} - -static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { - SKVStore *pStore = (SKVStore *)calloc(1, sizeof(SKVStore)); - if (pStore == NULL) goto _err; - - pStore->fname = strdup(fname); - if (pStore->fname == NULL) { - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; - goto _err; - } - - pStore->fd = -1; - pStore->iFunc = iFunc; - pStore->aFunc = aFunc; - pStore->appH = appH; - pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (pStore->map == NULL) { - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; - goto _err; - } - - return pStore; - -_err: - tdFreeKVStore(pStore); - return NULL; -} - -static void tdFreeKVStore(SKVStore *pStore) { - if (pStore) { - tfree(pStore->fname); - taosHashCleanup(pStore->map); - free(pStore); - } -} - -static int tdEncodeKVRecord(void **buf, SKVRecord *pRecord) { - int tlen = 0; - tlen += taosEncodeFixedU64(buf, pRecord->uid); - tlen += taosEncodeFixedI64(buf, pRecord->offset); - tlen += taosEncodeFixedI64(buf, pRecord->size); - - return tlen; -} - -static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) { - buf = taosDecodeFixedU64(buf, &(pRecord->uid)); - buf = taosDecodeFixedI64(buf, &(pRecord->offset)); - buf = taosDecodeFixedI64(buf, &(pRecord->size)); - - return buf; -} - -static int tdRestoreKVStore(SKVStore *pStore) { - char tbuf[128] = "\0"; - void * buf = NULL; - int64_t maxBufSize = 0; - SKVRecord rInfo = {0}; - SKVRecord *pRecord = NULL; - - ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR)); - ASSERT(pStore->info.size == TD_KVSTORE_HEADER_SIZE); - - while (true) { - int64_t tsize = taosRead(pStore->fd, tbuf, sizeof(SKVRecord)); - if (tsize == 0) break; - if (tsize < sizeof(SKVRecord)) { - uError("failed to read %" PRIzu " bytes from file %s at offset %" PRId64 "since %s", sizeof(SKVRecord), pStore->fname, - pStore->info.size, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); - ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); - ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true); - - if (rInfo.offset < 0) { - taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); - pStore->info.size += sizeof(SKVRecord); - pStore->info.nRecords--; - pStore->info.nDels++; - pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); - } else { - ASSERT(rInfo.offset > 0 && rInfo.size > 0); - if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { - uError("failed to put record in KV store %s", pStore->fname); - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; - goto _err; - } - - maxBufSize = MAX(maxBufSize, rInfo.size); - - if (lseek(pStore->fd, (off_t)rInfo.size, SEEK_CUR) < 0) { - uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - pStore->info.size += (sizeof(SKVRecord) + rInfo.size); - pStore->info.nRecords++; - } - } - - buf = malloc((size_t)maxBufSize); - if (buf == NULL) { - uError("failed to allocate %" PRId64 " bytes in KV store %s", maxBufSize, pStore->fname); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - pRecord = taosHashIterate(pStore->map, NULL); - while (pRecord) { - if (lseek(pStore->fd, (off_t)(pRecord->offset + sizeof(SKVRecord)), SEEK_SET) < 0) { - uError("failed to lseek file %s since %s, offset %" PRId64, pStore->fname, strerror(errno), pRecord->offset); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (taosRead(pStore->fd, buf, (size_t)pRecord->size) < pRecord->size) { - uError("failed to read %" PRId64 " bytes from file %s since %s, offset %" PRId64, pRecord->size, pStore->fname, - strerror(errno), pRecord->offset); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (pStore->iFunc) { - if ((*pStore->iFunc)(pStore->appH, buf, (int)pRecord->size) < 0) { - uError("failed to restore record uid %" PRIu64 " in kv store %s at offset %" PRId64 " size %" PRId64 - " since %s", - pRecord->uid, pStore->fname, pRecord->offset, pRecord->size, tstrerror(terrno)); - goto _err; - } - } - - pRecord = taosHashIterate(pStore->map, pRecord); - } - - if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); - - tfree(buf); - return 0; - -_err: - taosHashCancelIterate(pStore->map, pRecord); - tfree(buf); - return -1; -} -- GitLab