diff --git a/src/tsdb/inc/tsdbBuffer.h b/src/tsdb/inc/tsdbBuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..8083b4418246a7cc39ee7e620f56d49a687f23b8 --- /dev/null +++ b/src/tsdb/inc/tsdbBuffer.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_BUFFER_H_ +#define _TD_TSDB_BUFFER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + int64_t blockId; + int offset; + int remain; + char data[]; +} STsdbBufBlock; + +typedef struct { + pthread_cond_t poolNotEmpty; + int bufBlockSize; + int tBufBlocks; + int nBufBlocks; + int64_t index; + SList* bufBlockList; +} STsdbBufPool; + +#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold + +STsdbBufPool* tsdbNewBufPool(); +void tsdbFreeBufPool(STsdbBufPool* pBufPool); +int tsdbOpenBufPool(STsdbRepo* pRepo); +void tsdbCloseBufPool(STsdbRepo* pRepo); +SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_BUFFER_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h new file mode 100644 index 0000000000000000000000000000000000000000..277aa0eb9bef2c6d5a78855acf98259d0b720fc7 --- /dev/null +++ b/src/tsdb/inc/tsdbCommit.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_COMMIT_H_ +#define _TD_TSDB_COMMIT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +void *tsdbCommitData(STsdbRepo *pRepo); + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_COMMIT_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbCommitQueue.h b/src/tsdb/inc/tsdbCommitQueue.h new file mode 100644 index 0000000000000000000000000000000000000000..f1fd35986d8dbc0e66420ca55f3bc5ffd48c5c7b --- /dev/null +++ b/src/tsdb/inc/tsdbCommitQueue.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_COMMIT_QUEUE_H_ +#define _TD_TSDB_COMMIT_QUEUE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +int tsdbScheduleCommit(STsdbRepo *pRepo); + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbFS.h b/src/tsdb/inc/tsdbFS.h new file mode 100644 index 0000000000000000000000000000000000000000..0003d83c3f5beaea3efc7ca79595044c38bf3e1a --- /dev/null +++ b/src/tsdb/inc/tsdbFS.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_FS_H_ +#define _TD_TSDB_FS_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + int64_t fsversion; // file system version, related to program + int64_t version; + int64_t totalPoints; + int64_t totalStorage; +} STsdbFSMeta; + +typedef struct { + int64_t version; + STsdbFSMeta meta; + SMFile mf; // meta file + SArray* df; // data file array +} SFSVer; + +typedef struct { + pthread_rwlock_t lock; + + SFSVer fsv; +} STsdbFS; + +typedef struct { + int version; // current FS version + int index; + int fid; + SDFileSet* pSet; +} SFSIter; + +#define TSDB_FILE_INFO(tf) (&((tf)->info)) +#define TSDB_FILE_F(tf) (&((tf)->f))) +#define TSDB_FILE_FD(tf) ((tf)->fd) + +int tsdbOpenFS(STsdbRepo* pRepo); +void tsdbCloseFS(STsdbRepo* pRepo); +int tsdbFSNewTxn(STsdbRepo* pRepo); +int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); +int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); +int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); +void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid); +int tsdbRemoveDFileSet(SDFileSet* pSet); +int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); +void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); +SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to); +int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter); +SDFileSet* tsdbFSIterNext(SFSIter* pIter); +int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet); + +static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { + int code = pthread_rwlock_rdlock(&(pFs->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) { + int code = pthread_rwlock_wrlock(&(pFs->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { + int code = pthread_rwlock_unlock(&(pFs->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_FS_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h new file mode 100644 index 0000000000000000000000000000000000000000..ffe2ba557a27cec2f12d7c2780db8e704db5d7ba --- /dev/null +++ b/src/tsdb/inc/tsdbFile.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TS_TSDB_FILE_H_ +#define _TS_TSDB_FILE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F +#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF + +typedef enum { + TSDB_FILE_HEAD = 0, + TSDB_FILE_DATA, + TSDB_FILE_LAST, + TSDB_FILE_MAX, + TSDB_FILE_META, + TSDB_FILE_MANIFEST +} TSDB_FILE_T; + +// For meta file +typedef struct { + int64_t size; + int64_t tombSize; + int64_t nRecords; + int64_t nDels; + uint32_t magic; +} SMFInfo; + +typedef struct { + SMFInfo info; + TFILE f; + int fd; +} SMFile; + +void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo); +int tsdbOpenMFile(SMFile* pMFile, int flags); +void tsdbCloseMFile(SMFile* pMFile); +int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence); +int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte); +int64_t tsdbTellMFile(SMFile *pMFile); +int tsdbEncodeMFile(void** buf, SMFile* pMFile); +void* tsdbDecodeMFile(void* buf, SMFile* pMFile); + +// For .head/.data/.last file +typedef struct { + uint32_t magic; + uint32_t len; + uint32_t totalBlocks; + uint32_t totalSubBlocks; + uint32_t offset; + uint64_t size; + uint64_t tombSize; +} SDFInfo; + +typedef struct { + SDFInfo info; + TFILE f; + int fd; +} SDFile; + +void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo, + TSDB_FILE_T ftype); +void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile); +int tsdbOpenDFile(SDFile* pDFile, int flags); +void tsdbCloseDFile(SDFile* pDFile); +int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence); +int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte); +int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset); +int64_t tsdbTellDFile(SDFile* pDFile); +int tsdbEncodeDFile(void** buf, SDFile* pDFile); +void* tsdbDecodeDFile(void* buf, SDFile* pDFile); +void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm); + +typedef struct { + int fid; + int state; + SDFile files[TSDB_FILE_MAX]; +} SDFileSet; + +#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(&((f)->f)) +#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) + +void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id); +void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet); +int tsdbOpenDFileSet(SDFileSet* pSet, int flags); +void tsdbCloseDFileSet(SDFileSet* pSet); +int tsdbUpdateDFileSetHeader(SDFileSet* pSet); +int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet); + + +#ifdef __cplusplus +} +#endif + +#endif /* _TS_TSDB_FILE_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbLog.h b/src/tsdb/inc/tsdbLog.h new file mode 100644 index 0000000000000000000000000000000000000000..185474b205df05bb7c336bd48e782e0b1215f3f4 --- /dev/null +++ b/src/tsdb/inc/tsdbLog.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_LOG_H_ +#define _TD_TSDB_LOG_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +extern int32_t tsdbDebugFlag; + +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_LOG_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 866141614baf68a969367b258e58c4aa77163edf..73efbbad9225fed637b0831ca7a3002c01500d09 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -36,14 +36,6 @@ extern "C" { typedef struct STsdbRepo STsdbRepo; // ================= tsdbLog.h -extern int32_t tsdbDebugFlag; - -#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0) -#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0) -#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0) -#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0) -#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0) // ================= OTHERS @@ -55,404 +47,13 @@ extern int32_t tsdbDebugFlag; // Definitions // ================= tsdbMeta.c -#define TSDB_MAX_TABLE_SCHEMAS 16 - -typedef struct STable { - STableId tableId; - ETableType type; - tstr* name; // NOTE: there a flexible string here - uint64_t suid; - struct STable* pSuper; // super table pointer - uint8_t numOfSchemas; - STSchema* schema[TSDB_MAX_TABLE_SCHEMAS]; - STSchema* tagSchema; - SKVRow tagVal; - SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index - void* eventHandler; // TODO - void* streamHandler; // TODO - TSKEY lastKey; - SDataRow lastRow; - char* sql; - void* cqhandle; - SRWLatch latch; // TODO: implementa latch functions - T_REF_DECLARE() -} STable; - -typedef struct { - pthread_rwlock_t rwLock; - - int32_t nTables; - int32_t maxTables; - STable** tables; - SList* superList; - SHashObj* uidMap; - SKVStore* pStore; - int maxRowBytes; - int maxCols; -} STsdbMeta; - -#define TSDB_INIT_NTABLES 1024 -#define TABLE_TYPE(t) (t)->type -#define TABLE_NAME(t) (t)->name -#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data -#define TABLE_UID(t) (t)->tableId.uid -#define TABLE_TID(t) (t)->tableId.tid -#define TABLE_SUID(t) (t)->suid -#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) -#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch)) -#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch)) -#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch)) -#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch)) - -STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); -void tsdbFreeMeta(STsdbMeta* pMeta); -int tsdbOpenMeta(STsdbRepo* pRepo); -int tsdbCloseMeta(STsdbRepo* pRepo); -STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); -STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); -int tsdbWLockRepoMeta(STsdbRepo* pRepo); -int tsdbRLockRepoMeta(STsdbRepo* pRepo); -int tsdbUnlockRepoMeta(STsdbRepo* pRepo); -void tsdbRefTable(STable* pTable); -void tsdbUnRefTable(STable* pTable); -void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); - -static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { - if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { - return -1; - } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { - return 1; - } else { - return 0; - } -} - -static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) { - STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; - STSchema* pSchema = NULL; - STSchema* pTSchema = NULL; - - if (lock) TSDB_RLOCK_TABLE(pDTable); - if (version < 0) { // get the latest version of schema - pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; - } else { // get the schema with version - void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*), - tsdbCompareSchemaVersion, TD_EQ); - if (ptr == NULL) { - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - goto _exit; - } - pTSchema = *(STSchema**)ptr; - } - - ASSERT(pTSchema != NULL); - - if (copy) { - if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - } else { - pSchema = pTSchema; - } - -_exit: - if (lock) TSDB_RUNLOCK_TABLE(pDTable); - return pSchema; -} - -static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) { - return tsdbGetTableSchemaImpl(pTable, false, false, -1); -} - -static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { - if (pTable->type == TSDB_CHILD_TABLE) { // check child table first - STable *pSuper = pTable->pSuper; - if (pSuper == NULL) return NULL; - return pSuper->tagSchema; - } else if (pTable->type == TSDB_SUPER_TABLE) { - return pTable->tagSchema; - } else { - return NULL; - } -} - -static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { - ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow)); - return pTable->lastKey; -} // ================= tsdbBuffer.c -typedef struct { - int64_t blockId; - int offset; - int remain; - char data[]; -} STsdbBufBlock; - -typedef struct { - pthread_cond_t poolNotEmpty; - int bufBlockSize; - int tBufBlocks; - int nBufBlocks; - int64_t index; - SList* bufBlockList; -} STsdbBufPool; - -#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold - -STsdbBufPool* tsdbNewBufPool(); -void tsdbFreeBufPool(STsdbBufPool* pBufPool); -int tsdbOpenBufPool(STsdbRepo* pRepo); -void tsdbCloseBufPool(STsdbRepo* pRepo); -SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); // ------------------ tsdbMemTable.c -typedef struct { - int rowsInserted; - int rowsUpdated; - int rowsDeleteSucceed; - int rowsDeleteFailed; - int nOperations; - TSKEY keyFirst; - TSKEY keyLast; -} SMergeInfo; - -typedef struct { - STable * pTable; - SSkipListIterator *pIter; -} SCommitIter; - -typedef struct { - uint64_t uid; - TSKEY keyFirst; - TSKEY keyLast; - int64_t numOfRows; - SSkipList* pData; -} STableData; - -typedef struct { - T_REF_DECLARE() - SRWLatch latch; - TSKEY keyFirst; - TSKEY keyLast; - int64_t numOfRows; - int32_t maxTables; - STableData** tData; - SList* actList; - SList* extraBuffList; - SList* bufBlockList; -} SMemTable; - -enum { TSDB_UPDATE_META, TSDB_DROP_META }; - -#ifdef WINDOWS -#pragma pack(push ,1) -typedef struct { -#else -typedef struct __attribute__((packed)){ -#endif - char act; - uint64_t uid; -} SActObj; -#ifdef WINDOWS -#pragma pack(pop) -#endif - -typedef struct { - int len; - char cont[]; -} SActCont; - -int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); -void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); -void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); -int tsdbAsyncCommit(STsdbRepo* pRepo); -int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, - TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); -void* tsdbCommitData(STsdbRepo* pRepo); - -static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { - if (pIter == NULL) return NULL; - - SSkipListNode* node = tSkipListIterGet(pIter); - if (node == NULL) return NULL; - - return (SDataRow)SL_GET_NODE_DATA(node); -} - -static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { - SDataRow row = tsdbNextIterRow(pIter); - if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; - - return dataRowKey(row); -} - -static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { - SDataRow row = tsdbNextIterRow(pIter); - if (row == NULL) return TKEY_NULL; - - return dataRowTKey(row); -} - // ================= tsdbFile.c -#define TSDB_FILE_HEAD_SIZE 512 -#define TSDB_FILE_DELIMITER 0xF00AFA0F -#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF - -typedef enum { - TSDB_FILE_HEAD = 0, - TSDB_FILE_DATA, - TSDB_FILE_LAST, - TSDB_FILE_MAX, - TSDB_FILE_META, - TSDB_FILE_MANIFEST -} TSDB_FILE_T; - -// For meta file -typedef struct { - int64_t size; - int64_t tombSize; - int64_t nRecords; - int64_t nDels; - uint32_t magic; -} SMFInfo; - -typedef struct { - SMFInfo info; - TFILE f; - int fd; -} SMFile; - -void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo); -int tsdbOpenMFile(SMFile* pMFile, int flags); -void tsdbCloseMFile(SMFile* pMFile); -int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence); -int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte); -int64_t tsdbTellMFile(SMFile *pMFile); -int tsdbEncodeMFile(void** buf, SMFile* pMFile); -void* tsdbDecodeMFile(void* buf, SMFile* pMFile); - -// For .head/.data/.last file -typedef struct { - uint32_t magic; - uint32_t len; - uint32_t totalBlocks; - uint32_t totalSubBlocks; - uint32_t offset; - uint64_t size; - uint64_t tombSize; -} SDFInfo; - -typedef struct { - SDFInfo info; - TFILE f; - int fd; -} SDFile; - -void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo, - TSDB_FILE_T ftype); -void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile); -int tsdbOpenDFile(SDFile* pDFile, int flags); -void tsdbCloseDFile(SDFile* pDFile); -int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence); -int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte); -int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset); -int64_t tsdbTellDFile(SDFile* pDFile); -int tsdbEncodeDFile(void** buf, SDFile* pDFile); -void* tsdbDecodeDFile(void* buf, SDFile* pDFile); -void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm); - -typedef struct { - int fid; - int state; - SDFile files[TSDB_FILE_MAX]; -} SDFileSet; - -#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(&((f)->f)) -#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) - -void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id); -void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet); -int tsdbOpenDFileSet(SDFileSet* pSet, int flags); -void tsdbCloseDFileSet(SDFileSet* pSet); -int tsdbUpdateDFileSetHeader(SDFileSet* pSet); -int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet); - /* Statistic information of the TSDB file system. */ -typedef struct { - int64_t fsversion; // file system version, related to program - int64_t version; - int64_t totalPoints; - int64_t totalStorage; -} STsdbFSMeta; - -typedef struct { - int64_t version; - STsdbFSMeta meta; - SMFile mf; // meta file - SArray* df; // data file array -} SFSVer; - -typedef struct { - pthread_rwlock_t lock; - - SFSVer fsv; -} STsdbFS; - -typedef struct { - int version; // current FS version - int index; - int fid; - SDFileSet* pSet; -} SFSIter; - -#define TSDB_FILE_INFO(tf) (&((tf)->info)) -#define TSDB_FILE_F(tf) (&((tf)->f))) -#define TSDB_FILE_FD(tf) ((tf)->fd) - -int tsdbOpenFS(STsdbRepo* pRepo); -void tsdbCloseFS(STsdbRepo* pRepo); -int tsdbFSNewTxn(STsdbRepo* pRepo); -int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); -int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); -int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); -void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid); -int tsdbRemoveDFileSet(SDFileSet* pSet); -int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); -void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); -SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to); -int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter); -SDFileSet* tsdbFSIterNext(SFSIter* pIter); -int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet); - -static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { - int code = pthread_rwlock_rdlock(&(pFs->lock)); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} - -static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) { - int code = pthread_rwlock_wrlock(&(pFs->lock)); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} - -static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { - int code = pthread_rwlock_unlock(&(pFs->lock)); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} - // ================= tsdbStore.c #define KVSTORE_FILE_VERSION ((uint32_t)0) @@ -577,63 +178,7 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS // int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); // ================= tsdbMain.c -typedef struct { - int32_t totalLen; - int32_t len; - SDataRow row; -} SSubmitBlkIter; -typedef struct { - int32_t totalLen; - int32_t len; - void * pMsg; -} SSubmitMsgIter; - -struct STsdbRepo { - int8_t state; - - char* rootDir; - STsdbCfg config; - STsdbAppH appH; - STsdbStat stat; - STsdbMeta* tsdbMeta; - STsdbBufPool* pPool; - SMemTable* mem; - SMemTable* imem; - STsdbFS* fs; - sem_t readyToCommit; - pthread_mutex_t mutex; - bool repoLocked; - int32_t code; // Commit code -}; - -#define REPO_ID(r) (r)->config.tsdbId -#define REPO_CFG(r) (&((r)->config)) -#define IS_REPO_LOCKED(r) (r)->repoLocked -#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) - -char* tsdbGetMetaFileName(char* rootDir); -void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname); -int tsdbLockRepo(STsdbRepo* pRepo); -int tsdbUnlockRepo(STsdbRepo* pRepo); -char* tsdbGetDataDirName(char* rootDir); -int tsdbGetNextMaxTables(int tid); -STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); -STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); -int tsdbCheckCommit(STsdbRepo* pRepo); - -static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { - ASSERT(pRepo != NULL); - if (pRepo->mem == NULL) return NULL; - - SListNode* pNode = listTail(pRepo->mem->bufBlockList); - if (pNode == NULL) return NULL; - - STsdbBufBlock* pBufBlock = NULL; - tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock)); - - return pBufBlock; -} #include "tsdbReadImpl.h" diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h new file mode 100644 index 0000000000000000000000000000000000000000..3d341bb798ebc410319205995d70b6812493f8e9 --- /dev/null +++ b/src/tsdb/inc/tsdbMemTable.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_MEMTABLE_H_ +#define _TD_TSDB_MEMTABLE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + int rowsInserted; + int rowsUpdated; + int rowsDeleteSucceed; + int rowsDeleteFailed; + int nOperations; + TSKEY keyFirst; + TSKEY keyLast; +} SMergeInfo; + +typedef struct { + STable * pTable; + SSkipListIterator *pIter; +} SCommitIter; + +typedef struct { + uint64_t uid; + TSKEY keyFirst; + TSKEY keyLast; + int64_t numOfRows; + SSkipList* pData; +} STableData; + +typedef struct { + T_REF_DECLARE() + SRWLatch latch; + TSKEY keyFirst; + TSKEY keyLast; + int64_t numOfRows; + int32_t maxTables; + STableData** tData; + SList* actList; + SList* extraBuffList; + SList* bufBlockList; +} SMemTable; + +enum { TSDB_UPDATE_META, TSDB_DROP_META }; + +#ifdef WINDOWS +#pragma pack(push ,1) +typedef struct { +#else +typedef struct __attribute__((packed)){ +#endif + char act; + uint64_t uid; +} SActObj; +#ifdef WINDOWS +#pragma pack(pop) +#endif + +typedef struct { + int len; + char cont[]; +} SActCont; + +int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); +int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); +int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); +void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); +void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); +int tsdbAsyncCommit(STsdbRepo* pRepo); +int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, + TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); +void* tsdbCommitData(STsdbRepo* pRepo); + +static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { + if (pIter == NULL) return NULL; + + SSkipListNode* node = tSkipListIterGet(pIter); + if (node == NULL) return NULL; + + return (SDataRow)SL_GET_NODE_DATA(node); +} + +static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { + SDataRow row = tsdbNextIterRow(pIter); + if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; + + return dataRowKey(row); +} + +static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { + SDataRow row = tsdbNextIterRow(pIter); + if (row == NULL) return TKEY_NULL; + + return dataRowTKey(row); +} + + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_MEMTABLE_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h new file mode 100644 index 0000000000000000000000000000000000000000..c9e37c73f2fcfe66dfd20938b7b91872ce6ea106 --- /dev/null +++ b/src/tsdb/inc/tsdbMeta.h @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_META_H_ +#define _TD_TSDB_META_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define TSDB_MAX_TABLE_SCHEMAS 16 + +typedef struct STable { + STableId tableId; + ETableType type; + tstr* name; // NOTE: there a flexible string here + uint64_t suid; + struct STable* pSuper; // super table pointer + uint8_t numOfSchemas; + STSchema* schema[TSDB_MAX_TABLE_SCHEMAS]; + STSchema* tagSchema; + SKVRow tagVal; + SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index + void* eventHandler; // TODO + void* streamHandler; // TODO + TSKEY lastKey; + SDataRow lastRow; + char* sql; + void* cqhandle; + SRWLatch latch; // TODO: implementa latch functions + T_REF_DECLARE() +} STable; + +typedef struct { + pthread_rwlock_t rwLock; + + int32_t nTables; + int32_t maxTables; + STable** tables; + SList* superList; + SHashObj* uidMap; + SKVStore* pStore; + int maxRowBytes; + int maxCols; +} STsdbMeta; + +#define TSDB_INIT_NTABLES 1024 +#define TABLE_TYPE(t) (t)->type +#define TABLE_NAME(t) (t)->name +#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data +#define TABLE_UID(t) (t)->tableId.uid +#define TABLE_TID(t) (t)->tableId.tid +#define TABLE_SUID(t) (t)->suid +#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) +#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch)) +#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch)) +#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch)) +#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch)) + +STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); +void tsdbFreeMeta(STsdbMeta* pMeta); +int tsdbOpenMeta(STsdbRepo* pRepo); +int tsdbCloseMeta(STsdbRepo* pRepo); +STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); +STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); +int tsdbWLockRepoMeta(STsdbRepo* pRepo); +int tsdbRLockRepoMeta(STsdbRepo* pRepo); +int tsdbUnlockRepoMeta(STsdbRepo* pRepo); +void tsdbRefTable(STable* pTable); +void tsdbUnRefTable(STable* pTable); +void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); + +static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { + if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { + return -1; + } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { + return 1; + } else { + return 0; + } +} + +static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) { + STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + STSchema* pSchema = NULL; + STSchema* pTSchema = NULL; + + if (lock) TSDB_RLOCK_TABLE(pDTable); + if (version < 0) { // get the latest version of schema + pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; + } else { // get the schema with version + void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*), + tsdbCompareSchemaVersion, TD_EQ); + if (ptr == NULL) { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + goto _exit; + } + pTSchema = *(STSchema**)ptr; + } + + ASSERT(pTSchema != NULL); + + if (copy) { + if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + } else { + pSchema = pTSchema; + } + +_exit: + if (lock) TSDB_RUNLOCK_TABLE(pDTable); + return pSchema; +} + +static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) { + return tsdbGetTableSchemaImpl(pTable, false, false, -1); +} + +static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { + if (pTable->type == TSDB_CHILD_TABLE) { // check child table first + STable *pSuper = pTable->pSuper; + if (pSuper == NULL) return NULL; + return pSuper->tagSchema; + } else if (pTable->type == TSDB_SUPER_TABLE) { + return pTable->tagSchema; + } else { + return NULL; + } +} + +static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { + ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow)); + return pTable->lastKey; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_META_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h new file mode 100644 index 0000000000000000000000000000000000000000..ad948bcbd01735b370252c7e6793b7bb922150c9 --- /dev/null +++ b/src/tsdb/inc/tsdbint.h @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_INT_H_ +#define _TD_TSDB_INT_H_ + +#include "os.h" +#include "tlog.h" +#include "taosdef.h" +#include "tskiplist.h" +#include "tdataformat.h" +#include "tlockfree.h" +#include "tlist.h" +#include "hash.h" +#include "tarray.h" + +#include "tsdb.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct STsdbRepo STsdbRepo; + +// Log +#include "tsdbLog.h" +// Meta +#include "tsdbMeta.h" +// Buffer +#include "tsdbBuffer.h" +// MemTable +#include "tsdbMemTable.h" +// File +#include "tsdbFile.h" +// FS +#include "tsdbFS.h" +// ReadImpl +#include "tsdbReadImpl.h" +// Commit +#include "tsdbCommit.h" +// Commit Queue +#include "tsdbCommitQueue.h" +// Main definitions +struct STsdbRepo { + int8_t state; + + char* rootDir; + STsdbCfg config; + STsdbAppH appH; + STsdbStat stat; + STsdbMeta* tsdbMeta; + STsdbBufPool* pPool; + SMemTable* mem; + SMemTable* imem; + STsdbFS* fs; + sem_t readyToCommit; + pthread_mutex_t mutex; + bool repoLocked; + int32_t code; // Commit code +}; + +#define REPO_ID(r) (r)->config.tsdbId +#define REPO_CFG(r) (&((r)->config)) +#define IS_REPO_LOCKED(r) (r)->repoLocked +#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) + +char* tsdbGetMetaFileName(char* rootDir); +void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname); +int tsdbLockRepo(STsdbRepo* pRepo); +int tsdbUnlockRepo(STsdbRepo* pRepo); +char* tsdbGetDataDirName(char* rootDir); +int tsdbGetNextMaxTables(int tid); +STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); +STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); +int tsdbCheckCommit(STsdbRepo* pRepo); + +static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { + ASSERT(pRepo != NULL); + if (pRepo->mem == NULL) return NULL; + + SListNode* pNode = listTail(pRepo->mem->bufBlockList); + if (pNode == NULL) return NULL; + + STsdbBufBlock* pBufBlock = NULL; + tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock)); + + return pBufBlock; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_INT_H_ */ \ No newline at end of file diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 7cea27658c80d689972e3cb0f5dda3269a34b720..1798a21b9963c7641dd99dc7fa11a5dd977e0e3c 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "tsdb.h" -#include "tsdbMain.h" +#include "tsdbint.h" #define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index e49c020d19217d7da76aa3ebfb90ab873181f282..58c527f426f43db0ae23674bf92ca5fec121aded 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -19,6 +19,18 @@ #define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_MAX_INSERT_BATCH 512 +typedef struct { + int32_t totalLen; + int32_t len; + SDataRow row; +} SSubmitBlkIter; + +typedef struct { + int32_t totalLen; + int32_t len; + void * pMsg; +} SSubmitMsgIter; + static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);