From 7d42764f4498fcce12ed3aefae8db25c5bd993ca Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 31 Dec 2020 07:46:15 +0000 Subject: [PATCH] refact --- src/tsdb/inc/tsdbMain.h | 433 +++++++++++++++++++------------------- src/tsdb/src/tsdbCommit.c | 35 ++- 2 files changed, 240 insertions(+), 228 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 1057bcc22a..18cb1d569a 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -32,6 +32,9 @@ extern "C" { #endif +typedef struct STsdbRepo STsdbRepo; + +// ================= tsdbLog.h extern int32_t tsdbDebugFlag; #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0) @@ -41,6 +44,7 @@ extern int32_t tsdbDebugFlag; #define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) +// ================= OTHERS #define TSDB_MAX_TABLE_SCHEMAS 16 #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F @@ -88,6 +92,94 @@ typedef struct { int maxCols; } STsdbMeta; +#define TSDB_INIT_NTABLES 1024 +#define TABLE_TYPE(t) (t)->type +#define TABLE_NAME(t) (t)->name +#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data +#define TABLE_UID(t) (t)->tableId.uid +#define TABLE_TID(t) (t)->tableId.tid +#define TABLE_SUID(t) (t)->suid +#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) +#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch)) +#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch)) +#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch)) +#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch)) + +STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); +void tsdbFreeMeta(STsdbMeta* pMeta); +int tsdbOpenMeta(STsdbRepo* pRepo); +int tsdbCloseMeta(STsdbRepo* pRepo); +STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); +STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); +int tsdbWLockRepoMeta(STsdbRepo* pRepo); +int tsdbRLockRepoMeta(STsdbRepo* pRepo); +int tsdbUnlockRepoMeta(STsdbRepo* pRepo); +void tsdbRefTable(STable* pTable); +void tsdbUnRefTable(STable* pTable); +void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); + +static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { + if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { + return -1; + } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { + return 1; + } else { + return 0; + } +} + +static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) { + STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + STSchema* pSchema = NULL; + STSchema* pTSchema = NULL; + + if (lock) TSDB_RLOCK_TABLE(pDTable); + if (version < 0) { // get the latest version of schema + pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; + } else { // get the schema with version + void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*), + tsdbCompareSchemaVersion, TD_EQ); + if (ptr == NULL) { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + goto _exit; + } + pTSchema = *(STSchema**)ptr; + } + + ASSERT(pTSchema != NULL); + + if (copy) { + if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + } else { + pSchema = pTSchema; + } + +_exit: + if (lock) TSDB_RUNLOCK_TABLE(pDTable); + return pSchema; +} + +static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) { + return tsdbGetTableSchemaImpl(pTable, false, false, -1); +} + +static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { + if (pTable->type == TSDB_CHILD_TABLE) { // check child table first + STable *pSuper = pTable->pSuper; + if (pSuper == NULL) return NULL; + return pSuper->tagSchema; + } else if (pTable->type == TSDB_SUPER_TABLE) { + return pTable->tagSchema; + } else { + return NULL; + } +} + +static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { + ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow)); + return pTable->lastKey; +} + // ------------------ tsdbBuffer.c typedef struct { int64_t blockId; @@ -105,7 +197,25 @@ typedef struct { SList* bufBlockList; } STsdbBufPool; +#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold + +STsdbBufPool* tsdbNewBufPool(); +void tsdbFreeBufPool(STsdbBufPool* pBufPool); +int tsdbOpenBufPool(STsdbRepo* pRepo); +void tsdbCloseBufPool(STsdbRepo* pRepo); +SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); + // ------------------ tsdbMemTable.c +typedef struct { + int rowsInserted; + int rowsUpdated; + int rowsDeleteSucceed; + int rowsDeleteFailed; + int nOperations; + TSKEY keyFirst; + TSKEY keyLast; +} SMergeInfo; + typedef struct { STable * pTable; SSkipListIterator *pIter; @@ -152,6 +262,39 @@ typedef struct { char cont[]; } SActCont; +int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); +int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); +int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); +void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); +void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); +int tsdbAsyncCommit(STsdbRepo* pRepo); +int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, + TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); +void* tsdbCommitData(STsdbRepo* pRepo); + +static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { + if (pIter == NULL) return NULL; + + SSkipListNode* node = tSkipListIterGet(pIter); + if (node == NULL) return NULL; + + return (SDataRow)SL_GET_NODE_DATA(node); +} + +static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { + SDataRow row = tsdbNextIterRow(pIter); + if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; + + return dataRowKey(row); +} + +static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { + SDataRow row = tsdbNextIterRow(pIter); + if (row == NULL) return TKEY_NULL; + + return dataRowTKey(row); +} + // ------------------ tsdbFile.c extern const char* tsdbFileSuffix[]; @@ -217,6 +360,37 @@ typedef struct { } SFileGroupIter; #define TSDB_FILE_NAME(pFile) ((pFile)->file.aname) +#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) +#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) +#define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId +#define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId +#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0) +#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC +#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC + +STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); +void tsdbFreeFileH(STsdbFileH* pFileH); +int tsdbOpenFileH(STsdbRepo* pRepo); +void tsdbCloseFileH(STsdbRepo* pRepo, bool isRestart); +SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level); +void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); +void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); +SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); +int tsdbOpenFile(SFile* pFile, int oflag); +void tsdbCloseFile(SFile* pFile); +int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); +SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); +int tsdbGetFidLevel(int fid, SFidGroup fidg); +void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup); +int tsdbUpdateFileHeader(SFile* pFile); +int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); +void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); +void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); +int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); +void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); +void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); +void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); +int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); // ------------------ tsdbMain.c typedef struct { @@ -231,7 +405,7 @@ typedef struct { void * pMsg; } SSubmitMsgIter; -typedef struct { +struct STsdbRepo { int8_t state; char* rootDir; @@ -247,7 +421,34 @@ typedef struct { pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code -} STsdbRepo; +}; + +#define REPO_ID(r) (r)->config.tsdbId +#define IS_REPO_LOCKED(r) (r)->repoLocked +#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) + +char* tsdbGetMetaFileName(char* rootDir); +void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname); +int tsdbLockRepo(STsdbRepo* pRepo); +int tsdbUnlockRepo(STsdbRepo* pRepo); +char* tsdbGetDataDirName(char* rootDir); +int tsdbGetNextMaxTables(int tid); +STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); +STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); +int tsdbCheckCommit(STsdbRepo* pRepo); + +static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { + ASSERT(pRepo != NULL); + if (pRepo->mem == NULL) return NULL; + + SListNode* pNode = listTail(pRepo->mem->bufBlockList); + if (pNode == NULL) return NULL; + + STsdbBufBlock* pBufBlock = NULL; + tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock)); + + return pBufBlock; +} // ------------------ tsdbRWHelper.c typedef struct { @@ -343,203 +544,24 @@ typedef struct { void* compBuffer; // Buffer for temperary compress/decompress purpose } SRWHelper; -typedef struct { - int rowsInserted; - int rowsUpdated; - int rowsDeleteSucceed; - int rowsDeleteFailed; - int nOperations; - TSKEY keyFirst; - TSKEY keyLast; -} SMergeInfo; // ------------------ tsdbScan.c typedef struct { - SFileGroup fGroup; - int numOfIdx; + SFileGroup fGroup; + int numOfIdx; SBlockIdx* pCompIdx; SBlockInfo* pCompInfo; - void* pBuf; - FILE* tLogStream; + void* pBuf; + FILE* tLogStream; } STsdbScanHandle; -// Operations -// ------------------ tsdbMeta.c -#define TSDB_INIT_NTABLES 1024 -#define TABLE_TYPE(t) (t)->type -#define TABLE_NAME(t) (t)->name -#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data -#define TABLE_UID(t) (t)->tableId.uid -#define TABLE_TID(t) (t)->tableId.tid -#define TABLE_SUID(t) (t)->suid -#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) -#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch)) -#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch)) -#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch)) -#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch)) - -STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); -void tsdbFreeMeta(STsdbMeta* pMeta); -int tsdbOpenMeta(STsdbRepo* pRepo); -int tsdbCloseMeta(STsdbRepo* pRepo); -STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); -STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); -int tsdbWLockRepoMeta(STsdbRepo* pRepo); -int tsdbRLockRepoMeta(STsdbRepo* pRepo); -int tsdbUnlockRepoMeta(STsdbRepo* pRepo); -void tsdbRefTable(STable* pTable); -void tsdbUnRefTable(STable* pTable); -void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); - -static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { - if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { - return -1; - } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { - return 1; - } else { - return 0; - } -} - -static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) { - STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; - STSchema* pSchema = NULL; - STSchema* pTSchema = NULL; - - if (lock) TSDB_RLOCK_TABLE(pDTable); - if (version < 0) { // get the latest version of schema - pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; - } else { // get the schema with version - void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*), - tsdbCompareSchemaVersion, TD_EQ); - if (ptr == NULL) { - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - goto _exit; - } - pTSchema = *(STSchema**)ptr; - } - - ASSERT(pTSchema != NULL); - - if (copy) { - if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - } else { - pSchema = pTSchema; - } - -_exit: - if (lock) TSDB_RUNLOCK_TABLE(pDTable); - return pSchema; -} - -static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) { - return tsdbGetTableSchemaImpl(pTable, false, false, -1); -} - -static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { - if (pTable->type == TSDB_CHILD_TABLE) { // check child table first - STable *pSuper = pTable->pSuper; - if (pSuper == NULL) return NULL; - return pSuper->tagSchema; - } else if (pTable->type == TSDB_SUPER_TABLE) { - return pTable->tagSchema; - } else { - return NULL; - } -} - -static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { - ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow)); - return pTable->lastKey; -} - -// ------------------ tsdbBuffer.c -#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold - -STsdbBufPool* tsdbNewBufPool(); -void tsdbFreeBufPool(STsdbBufPool* pBufPool); -int tsdbOpenBufPool(STsdbRepo* pRepo); -void tsdbCloseBufPool(STsdbRepo* pRepo); -SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); - -// ------------------ tsdbMemTable.c -int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); -void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); -void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); -int tsdbAsyncCommit(STsdbRepo* pRepo); -int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, - TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); -void* tsdbCommitData(STsdbRepo* pRepo); - -static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { - if (pIter == NULL) return NULL; - - SSkipListNode* node = tSkipListIterGet(pIter); - if (node == NULL) return NULL; - - return (SDataRow)SL_GET_NODE_DATA(node); -} - -static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { - SDataRow row = tsdbNextIterRow(pIter); - if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; - - return dataRowKey(row); -} - -static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { - SDataRow row = tsdbNextIterRow(pIter); - if (row == NULL) return TKEY_NULL; - - return dataRowTKey(row); -} - -static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { - ASSERT(pRepo != NULL); - if (pRepo->mem == NULL) return NULL; - - SListNode* pNode = listTail(pRepo->mem->bufBlockList); - if (pNode == NULL) return NULL; - - STsdbBufBlock* pBufBlock = NULL; - tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock)); - - return pBufBlock; -} - -// ------------------ tsdbFile.c -#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) -#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) -#define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId -#define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId -#define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0) -#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC -#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC - -STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); -void tsdbFreeFileH(STsdbFileH* pFileH); -int tsdbOpenFileH(STsdbRepo* pRepo); -void tsdbCloseFileH(STsdbRepo* pRepo, bool isRestart); -SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level); -void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); -void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); -SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); -int tsdbOpenFile(SFile* pFile, int oflag); -void tsdbCloseFile(SFile* pFile); -int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); -SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); -int tsdbGetFidLevel(int fid, SFidGroup fidg); -void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup); -int tsdbUpdateFileHeader(SFile* pFile); -int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); -void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); -void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); -int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); -void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); -void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); -void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); -int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); +int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid); +STsdbScanHandle* tsdbNewScanHandle(); +void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream); +int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid); +int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle); +int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx); +int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); +void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state @@ -597,31 +619,6 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { } } -// ------------------ tsdbMain.c -#define REPO_ID(r) (r)->config.tsdbId -#define IS_REPO_LOCKED(r) (r)->repoLocked -#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) - -char* tsdbGetMetaFileName(char* rootDir); -void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname); -int tsdbLockRepo(STsdbRepo* pRepo); -int tsdbUnlockRepo(STsdbRepo* pRepo); -char* tsdbGetDataDirName(char* rootDir); -int tsdbGetNextMaxTables(int tid); -STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); -STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); -int tsdbCheckCommit(STsdbRepo* pRepo); - -// ------------------ tsdbScan.c -int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid); -STsdbScanHandle* tsdbNewScanHandle(); -void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream); -int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid); -int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle); -int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx); -int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); -void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); - // ------------------ tsdbCommitQueue.c int tsdbScheduleCommit(STsdbRepo *pRepo); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 342b1f95e6..686e171cd4 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -23,6 +23,7 @@ typedef struct { static int tsdbCommitTSData(STsdbRepo *pRepo); static int tsdbCommitMeta(STsdbRepo *pRepo); +static int tsdbStartCommit(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo, int eno); static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch); @@ -33,12 +34,10 @@ static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch); static void tsdbDestroyCommitH(SCommitH *pch, int niter); void *tsdbCommitData(STsdbRepo *pRepo) { - SMemTable * pMem = pRepo->imem; - - tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", - REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); - - pRepo->code = TSDB_CODE_SUCCESS; + if (tsdbStartCommit(pRepo) < 0) { + tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } // Commit to update meta file if (tsdbCommitMeta(pRepo) < 0) { @@ -52,17 +51,14 @@ void *tsdbCommitData(STsdbRepo *pRepo) { goto _err; } - tsdbInfo("vgId:%d commit over, succeed", REPO_ID(pRepo)); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); - return NULL; _err: ASSERT(terrno != TSDB_CODE_SUCCESS); pRepo->code = terrno; - tsdbInfo("vgId:%d commit over, failed", REPO_ID(pRepo)); - tsdbEndCommit(pRepo, terrno); + tsdbEndCommit(pRepo, terrno); return NULL; } @@ -151,19 +147,38 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { goto _err; } + // TODO + // tsdbUpdateMFile(pRepo, NULL) + return 0; _err: return -1; } +static int tsdbStartCommit(STsdbRepo *pRepo) { + SMemTable *pMem = pRepo->imem; + + tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d", + REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList)); + + // TODO + + pRepo->code = TSDB_CODE_SUCCESS; + return 0; +} + static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { + tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); + SMemTable *pIMem = pRepo->imem; tsdbLockRepo(pRepo); pRepo->imem = NULL; tsdbUnlockRepo(pRepo); tsdbUnRefMemTable(pRepo, pIMem); + sem_post(&(pRepo->readyToCommit)); } -- GitLab