From 253c7e64d6ed284e8f68bde04378f6b753f7f2c2 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Tue, 7 Apr 2020 18:46:09 +0800 Subject: [PATCH] first version of committing data --- src/common/inc/dataformat.h | 1 + src/common/src/dataformat.c | 3 +- src/vnode/tsdb/inc/tsdbMain.h | 160 ++++- src/vnode/tsdb/src/tsdbFile.c | 126 ++-- src/vnode/tsdb/src/tsdbMain.c | 502 +++++-------- src/vnode/tsdb/src/tsdbRWHelper.c | 1057 ++++++++++++++++++++++++++++ src/vnode/tsdb/src/tsdbRead.c | 22 +- src/vnode/tsdb/tests/tsdbTests.cpp | 47 +- 8 files changed, 1489 insertions(+), 429 deletions(-) create mode 100644 src/vnode/tsdb/src/tsdbRWHelper.c diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 04fa7dcc7d..231786ff73 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -136,6 +136,7 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); +int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index bff041df1b..1f5d83d9af 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -382,6 +382,7 @@ static int tdFLenFromSchema(STSchema *pSchema) { return ret; } -int tdMergeDataCols(SDataCols *target, SDataCols *source) { +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { + // TODO return 0; } \ No newline at end of file diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index 06f62ea6f7..54f5ab0cce 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -90,9 +90,9 @@ typedef struct { STable *superList; // super table list TODO: change it to list container - void *map; // table map of (uid ===> table) + void *map; // table map of (uid ===> table) - SMetaFile *mfh; // meta file handle + SMetaFile *mfh; // meta file handle int maxRowBytes; int maxCols; } STsdbMeta; @@ -118,14 +118,14 @@ STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); #define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id] #define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */ -STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo); +STsdbMeta *tsdbGetMeta(tsdb_repo_t *pRepo); int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg); int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId); STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); // int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable); STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid); -char *getTupleKey(const void * data); +char * getTupleKey(const void *data); // ------------------------------ TSDB CACHE INTERFACES ------------------------------ #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */ @@ -191,8 +191,8 @@ typedef struct { } SFileInfo; typedef struct { - int fd; - char fname[128]; + int fd; + char fname[128]; SFileInfo info; } SFile; @@ -216,11 +216,14 @@ typedef struct { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); void tsdbCloseFileH(STsdbFileH *pFileH); -int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); -int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); +int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, + int toClose); +SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); int tsdbOpenFile(SFile *pFile, int oflag); -int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); +int tsdbCloseFile(SFile *pFile); +SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); +int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname); #define TSDB_FGROUP_ITER_FORWARD 0 #define TSDB_FGROUP_ITER_BACKWARD 1 @@ -265,6 +268,8 @@ typedef struct { TSKEY keyLast; } SCompBlock; +// Maximum number of sub-blocks a super-block can have +#define TSDB_MAX_SUBBLOCKS 8 #define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1) #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) @@ -276,15 +281,15 @@ typedef struct { } SCompInfo; #define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx)) -#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size)\ -do {\ - if (pCompBlock->numOfSubBlocks > 1) {\ - pCompBlock = pCompInfo->blocks + pCompBlock->offset;\ - size = pCompBlock->numOfSubBlocks;\ - } else {\ - size = 1;\ - }\ -} while (0) +#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size) \ + do { \ + if (pCompBlock->numOfSubBlocks > 1) { \ + pCompBlock = pCompInfo->blocks + pCompBlock->offset; \ + size = pCompBlock->numOfSubBlocks; \ + } else { \ + size = 1; \ + } \ + } while (0) // TODO: take pre-calculation into account typedef struct { @@ -302,18 +307,11 @@ typedef struct { SCompCol cols[]; } SCompData; -STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo); - -int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols); - -int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); -int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); -int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); -int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); -int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); +STsdbFileH *tsdbGetFile(tsdb_repo_t *pRepo); +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, + SDataCols *pCols); SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); - void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); // TSDB repository definition @@ -348,6 +346,112 @@ typedef struct _tsdb_repo { } STsdbRepo; +typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; + +typedef struct { + tsdb_rw_helper_t type; // helper type + int maxTables; + int maxRowSize; + int maxRows; + int maxCols; + int minRowsPerFileBlock; + int maxRowsPerFileBlock; + int8_t compress; +} SHelperCfg; + +typedef struct { + int fid; + TSKEY minKey; + TSKEY maxKey; + // For read/write purpose + SFile headF; + SFile dataF; + SFile lastF; + // For write purpose only + SFile nHeadF; + SFile nLastF; +} SHelperFile; + +typedef struct { + int64_t uid; + int32_t tid; + int32_t sversion; +} SHelperTable; + +typedef struct { + // Global configuration + SHelperCfg config; + + SHelperFile files; + + SHelperTable tableInfo; + + // ---------- For read purpose + int8_t state; // current loading state + + SCompIdx *pCompIdx; + size_t compIdxSize; + + SCompInfo *pCompInfo; + size_t compInfoSize; + int blockIter; // For write purpose + + SCompData *pCompData; + size_t compDataSize; + + SDataCols *pDataCols[2]; + + // ---------- For read purpose + bool hasLast; + + int newBlocks; + SCompIdx *pWCompIdx; + size_t wCompIdxSize; + + SCompInfo *pWCompInfo; + size_t wCompInfoSize; + + SCompData *pWCompData; + size_t wCompDataSize; +} SRWHelper; + +// --------- Helper state +#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state +#define TSDB_HELPER_FILE_SET 0x1 // File is set +#define TSDB_HELPER_FILE_OPEN 0x2 // File is opened + +#define TSDB_HELPER_IDX_LOAD 0x4 // SCompIdx part is loaded +#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded +#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded + +#define TSDB_HELPER_TYPE(h) ((h)->config.type) + +#define helperSetState(h, s) (((h)->state) |= (s)) +#define helperClearState(h, s) ((h)->state &= (~(s))) +#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) + +int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg); +void tsdbDestroyHelper(SRWHelper *pHelper); +void tsdbClearHelper(SRWHelper *pHelper); + +// --------- For set operations +int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup); +int tsdbOpenHelperFile(SRWHelper *pHelper); +void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema); +int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError); + +// --------- For read operations +int tsdbLoadCompIdx(SRWHelper *pHelper, void *target); +int tsdbLoadCompInfo(SRWHelper *pHelper, void *target); +int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target); +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds); +int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); + +// --------- For write operations +int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols); +int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper); +int tsdbWriteCompInfo(SRWHelper *pHelper); +int tsdbWriteCompIdx(SRWHelper *pHelper); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index bd6699eb84..b6da572cfb 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "tutil.h" #include "tsdbMain.h" @@ -33,7 +34,6 @@ const char *tsdbFileSuffix[] = { static int compFGroupKey(const void *key, const void *fgroup); static int compFGroup(const void *arg1, const void *arg2); -static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); @@ -91,24 +91,36 @@ static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { return 0; } -int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { - if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; +/** + * Create the file group if the file group not exists. + * + * @return A pointer to + */ +SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { + if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL; SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one + + SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); + if (pGroup == NULL) { // if not exists, create one pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) { - // TODO: deal with the ERROR here, remove those creaed file - return -1; - } + if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), + type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) + goto _err; } pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); + return tsdbSearchFGroup(pFileH, fid); } - return 0; + + return pGroup; + +_err: + // TODO: deal with the err here + return NULL; } int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { @@ -181,27 +193,27 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { return ret; } -int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { - SCompBlock *pBlock = pStartBlock; - for (int i = 0; i < numOfBlocks; i++) { - if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; - pCols->numOfPoints += (pCompData->cols[0].len / 8); - for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { - SCompCol *pCompCol = &(pCompData->cols[iCol]); - // pCols->numOfPoints += pBlock->numOfPoints; - int k = 0; - for (; k < pCols->numOfCols; k++) { - if (pCompCol->colId == pCols->cols[k].colId) break; - } - - if (tsdbLoadColData(pFile, pCompCol, pBlock->offset, - (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0) - return -1; - } - pStartBlock++; - } - return 0; -} +// int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { +// SCompBlock *pBlock = pStartBlock; +// for (int i = 0; i < numOfBlocks; i++) { +// if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; +// pCols->numOfPoints += (pCompData->cols[0].len / 8); +// for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { +// SCompCol *pCompCol = &(pCompData->cols[iCol]); +// // pCols->numOfPoints += pBlock->numOfPoints; +// int k = 0; +// for (; k < pCols->numOfCols; k++) { +// if (pCompCol->colId == pCols->cols[k].colId) break; +// } + +// if (tsdbLoadColData(pFile, pCompCol, pBlock->offset, +// (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0) +// return -1; +// } +// pStartBlock++; +// } +// return 0; +// } int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) { SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx); @@ -237,42 +249,42 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf return 0; } -int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { - SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); - if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; +// int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { +// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); +// if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; - if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1; - // TODO: need to check the correctness - return 0; -} +// if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1; +// // TODO: need to check the correctness +// return 0; +// } -int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { - SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); +// int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { +// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); - if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1; +// if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1; - if (read(pFile->fd, buf, pIdx->len) < 0) return -1; +// if (read(pFile->fd, buf, pIdx->len) < 0) return -1; - // TODO: need to check the correctness +// // TODO: need to check the correctness - return 0; -} +// return 0; +// } -int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { - // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); +// int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { +// // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); - if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; - size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; - if (read(pFile->fd, buf, size) < 0) return -1; +// if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; +// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; +// if (read(pFile->fd, buf, size) < 0) return -1; - return 0; -} +// return 0; +// } -int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { - if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; - if (read(pFile->fd, buf, pCol->len) < 0) return -1; - return 0; -} +// int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { +// if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; +// if (read(pFile->fd, buf, pCol->len) < 0) return -1; +// return 0; +// } static int compFGroupKey(const void *key, const void *fgroup) { int fid = *(int *)key; @@ -317,7 +329,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return 0; } -static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { +int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { if (dataDir == NULL || fname == NULL) return -1; sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 1a8e50d0ee..b5d573764e 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -55,11 +55,11 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); -static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, SDataCols *pDataCols); +static TSKEY tsdbNextIterKey(SSkipListIterator *pIter); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); -static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, - int64_t uid); +// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, +// int64_t uid); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -751,6 +751,8 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { + if (pIter == NULL) return 0; + int numOfRows = 0; do { @@ -811,7 +813,8 @@ static void *tsdbCommitData(void *arg) { STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCache *pCache = pRepo->tsdbCache; - STsdbCfg * pCfg = &(pRepo->config); + STsdbCfg * pCfg = &(pRepo->config); + SDataCols * pDataCols = NULL; if (pCache->imem == NULL) return NULL; // Create the iterator to read from cache @@ -821,24 +824,34 @@ static void *tsdbCommitData(void *arg) { return NULL; } - // Create a data column buffer for commit - SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); - if (pDataCols == NULL) { - // TODO: deal with the error - return NULL; - } + // Create a write helper for commit data + SRWHelper whelper; + SHelperCfg hcfg = { + .type = TSDB_WRITE_HELPER, + .maxTables = pCfg->maxTables, + .maxRowSize = pMeta->maxRowBytes, + .maxRows = pCfg->maxRowsPerFileBlock, + .maxCols = pMeta->maxCols, + .minRowsPerFileBlock = pCfg->minRowsPerFileBlock, + .compress = 2 // TODO make it a configuration + }; + if (tsdbInitHelper(&whelper, &hcfg) < 0) goto _exit; + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit; int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) { - // TODO: deal with the error here - // assert(0); + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + ASSERT(false); + goto _exit; } } +_exit: tdFreeDataCols(pDataCols); + tsdbDestroyHelper(&whelper); tsdbDestroyTableIters(iters, pCfg->maxTables); tsdbLockRepo(arg); @@ -849,7 +862,7 @@ static void *tsdbCommitData(void *arg) { // TODO: free the skiplist for (int i = 0; i < pCfg->maxTables; i++) { STable *pTable = pMeta->tables[i]; - if (pTable && pTable->imem) { // Here has memory leak + if (pTable && pTable->imem) { // Here has memory leak pTable->imem = NULL; } } @@ -858,19 +871,12 @@ static void *tsdbCommitData(void *arg) { return NULL; } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { - int isNewLastFile = 0; +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, SDataCols *pDataCols) { STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbCfg * pCfg = &pRepo->config; - SFile hFile, lFile; SFileGroup *pGroup = NULL; - SCompIdx * pIndices = NULL; - SCompInfo * pCompInfo = NULL; - // size_t compInfoSize = 0; - // SCompBlock compBlock; - // SCompBlock *pBlock = &compBlock; TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); @@ -879,334 +885,212 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); if (!hasDataToCommit) return 0; // No data to commit, just return - // TODO: make it more flexible - pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000); - // Create and open files for commit tsdbGetDataDirName(pRepo, dataDir); - if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ - } - pGroup = tsdbOpenFilesForCommit(pFileH, fid); - if (pGroup == NULL) { /* TODO */ - } - tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 1); - tsdbOpenFile(&hFile, O_RDWR); - if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { - // TODO: make it not to write the last file every time - tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); - isNewLastFile = 1; - } + if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err; - // Load the SCompIdx - pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - if (pIndices == NULL) { /* TODO*/ - } - if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ - } + // Set the file to write/read + tsdbSetHelperFile(pHelper, pGroup); - lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + // Open files for write/read + if (tsdbOpenHelperFile(pHelper) < 0) goto _err; // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; - SCompIdx * pIdx = &pIndices[tid]; - - int nNewBlocks = 0; - - if (pTable == NULL || pIter == NULL) continue; - - /* If no new data to write for this table, just write the old data to new file - * if there are. - */ - if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { - // has old data - if (pIdx->len > 0) { - goto _table_over; - // if (isNewLastFile && pIdx->hasLast) { - if (0) { - // need to move the last block to new file - if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ - } - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ - } - - tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable)); - - SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); - int nBlocks = 0; - - TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); - - SCompData tBlock; - int64_t toffset; - int32_t tlen; - tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); - - tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid); - pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); - pTBlock->offset = toffset; - pTBlock->len = tlen; - pTBlock->numOfPoints = pCols->numOfPoints; - pTBlock->numOfSubBlocks = 1; - - pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); - if (nBlocks > 1) { - pIdx->len -= (sizeof(SCompBlock) * nBlocks); - } - write(hFile.fd, (void *)pCompInfo, pIdx->len); - } else { - pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); - sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); - hFile.info.size += pIdx->len; - } - } - continue; - } - pCompInfo->delimiter = TSDB_FILE_DELIMITER; - pCompInfo->checksum = 0; - pCompInfo->uid = pTable->tableId.uid; - - // Load SCompBlock part if neccessary - // int isCompBlockLoaded = 0; - if (0) { - // if (pIdx->offset > 0) { - if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { - // has last block || cache key overlap with commit key - pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ - } - // if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; - } else { - // TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part - // and write those new blocks to it - } - } - - tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable)); + SHelperTable hTable = {.uid = pTable->tableId.uid, .tid = pTable->tableId.tid, .sversion = pTable->sversion}; + tsdbSetHelperTable(pHelper, &hTable, tsdbGetTableSchema(pMeta, pTable)); + tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable)); + // Loop to write the data in the cache to files, if no data to write, just break + // the loop int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; - while (1) { - tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); - if (pCols->numOfPoints == 0) break; - - int pointsWritten = pCols->numOfPoints; - // TODO: all write to the end of .data file - int64_t toffset = 0; - int32_t tlen = 0; - tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid); - - // Make the compBlock - SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++; - pTBlock->offset = toffset; - pTBlock->len = tlen; - pTBlock->keyFirst = dataColsKeyFirst(pCols); - pTBlock->keyLast = dataColsKeyLast(pCols); - pTBlock->last = 0; - pTBlock->algorithm = 0; - pTBlock->numOfPoints = pCols->numOfPoints; - pTBlock->sversion = pTable->sversion; - pTBlock->numOfSubBlocks = 1; - pTBlock->numOfCols = pCols->numOfCols; - - if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols); - - tdPopDataColsPoints(pCols, pointsWritten); - maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; - } + while (true) { + int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols); + ASSERT(rowsRead >= 0); + if (pDataCols->numOfPoints == 0) break; + int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); + if (rowsWritten < 0) goto _err; + assert(rowsWritten <= pDataCols->numOfPoints); -_table_over: - // Write the SCompBlock part - pIdx->offset = lseek(hFile.fd, 0, SEEK_END); - if (pIdx->len > 0) { - int bytes = tsendfile(hFile.fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); - if (bytes < pIdx->len) { - printf("Failed to send file, reason: %s\n", strerror(errno)); - } - if (nNewBlocks > 0) { - write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks); - pIdx->len += (sizeof(SCompBlock) * nNewBlocks); - } - } else { - if (nNewBlocks > 0) { - write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks); - pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks; - } + tdPopDataColsPoints(pDataCols, rowsWritten); + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints; } - pIdx->checksum = 0; - pIdx->numOfSuperBlocks += nNewBlocks; - pIdx->hasLast = 0; - } - - // Write the SCompIdx part - if (lseek(hFile.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {/* TODO */} - if (write(hFile.fd, (void *)pIndices, sizeof(SCompIdx) * pCfg->maxTables) < 0) {/* TODO */} + // Move the last block to the new .l file if neccessary + if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err; - // close the files - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbCloseFile(&pGroup->files[type]); - } - tsdbCloseFile(&hFile); - if (isNewLastFile) tsdbCloseFile(&lFile); - // TODO: replace the .head and .last file - rename(hFile.fname, pGroup->files[TSDB_FILE_TYPE_HEAD].fname); - pGroup->files[TSDB_FILE_TYPE_HEAD].info = hFile.info; - if (isNewLastFile) { - rename(lFile.fname, pGroup->files[TSDB_FILE_TYPE_LAST].fname); - pGroup->files[TSDB_FILE_TYPE_LAST].info = lFile.info; + // Write the SCompBlock part + if (tsdbWriteCompInfo(pHelper) < 0) goto _err; + } - if (pIndices) free(pIndices); - if (pCompInfo) free(pCompInfo); + if (tsdbWriteCompIdx(pHelper) < 0) goto _err; + + tsdbCloseHelperFile(pHelper, 0); + // TODO: make it atomic with some methods + pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF; + pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF; + pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF; return 0; + + _err: + tsdbCloseHelperFile(pHelper, 1); + return -1; } -static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) { - if (pIter == NULL) return 0; +/** + * Return the next iterator key. + * + * @return the next key if iter has + * -1 if iter not + */ +static TSKEY tsdbNextIterKey(SSkipListIterator *pIter) { + if (pIter == NULL) return -1; SSkipListNode *node = tSkipListIterGet(pIter); - if (node == NULL) return 0; + if (node == NULL) return -1; SDataRow row = SL_GET_NODE_DATA(node); - if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1; - - return 0; + return dataRowKey(row); } static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) { + TSKEY nextKey; for (int i = 0; i < nIters; i++) { SSkipListIterator *pIter = iters[i]; - if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1; + nextKey = tsdbNextIterKey(pIter); + if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } return 0; } -static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) { - size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols; - SCompData *pCompData = (SCompData *)malloc(size); - if (pCompData == NULL) return -1; +// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) { +// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols; +// SCompData *pCompData = (SCompData *)malloc(size); +// if (pCompData == NULL) return -1; - pCompData->delimiter = TSDB_FILE_DELIMITER; - pCompData->uid = uid; - pCompData->numOfCols = pCols->numOfCols; +// pCompData->delimiter = TSDB_FILE_DELIMITER; +// pCompData->uid = uid; +// pCompData->numOfCols = pCols->numOfCols; - *offset = lseek(pFile->fd, 0, SEEK_END); - *len = size; +// *offset = lseek(pFile->fd, 0, SEEK_END); +// *len = size; - int toffset = size; - for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { - SCompCol *pCompCol = pCompData->cols + iCol; - SDataCol *pDataCol = pCols->cols + iCol; +// int toffset = size; +// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { +// SCompCol *pCompCol = pCompData->cols + iCol; +// SDataCol *pDataCol = pCols->cols + iCol; - pCompCol->colId = pDataCol->colId; - pCompCol->type = pDataCol->type; - pCompCol->offset = toffset; - - // TODO: add compression - pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite; - toffset += pCompCol->len; - } - - // Write the block - if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; - *len += size; - for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { - SDataCol *pDataCol = pCols->cols + iCol; - SCompCol *pCompCol = pCompData->cols + iCol; - if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; - *len += pCompCol->len; - } - - if (pCompData == NULL) free((void *)pCompData); - return 0; - -_err: - if (pCompData == NULL) free((void *)pCompData); - return -1; -} - -static int compareKeyBlock(const void *arg1, const void *arg2) { - TSKEY key = *(TSKEY *)arg1; - SCompBlock *pBlock = (SCompBlock *)arg2; - - if (key < pBlock->keyFirst) { - return -1; - } else if (key > pBlock->keyLast) { - return 1; - } - - return 0; -} - -int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { - STsdbCfg * pCfg = &(pRepo->config); - SFile * pFile = NULL; - int numOfPointsToWrite = 0; - int64_t offset = 0; - int32_t len = 0; - - memset((void *)pCompBlock, 0, sizeof(SCompBlock)); - - if (pCompInfo == NULL) { - // Just append the data block to .data or .l or .last file - numOfPointsToWrite = pCols->numOfPoints; - if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file - pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]); - } else { // Write to .last or .l file - pCompBlock->last = 1; - if (lFile) { - pFile = lFile; - } else { - pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]); - } - } - tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid); - pCompBlock->offset = offset; - pCompBlock->len = len; - pCompBlock->algorithm = 2; // TODO : add to configuration - pCompBlock->sversion = pCols->sversion; - pCompBlock->numOfPoints = pCols->numOfPoints; - pCompBlock->numOfSubBlocks = 1; - pCompBlock->numOfCols = pCols->numOfCols; - pCompBlock->keyFirst = dataColsKeyFirst(pCols); - pCompBlock->keyLast = dataColsKeyLast(pCols); - } else { - // Need to merge the block to either the last block or the other block - TSKEY keyFirst = dataColsKeyFirst(pCols); - SCompBlock *pMergeBlock = NULL; - - // Search the block to merge in - void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks, - compareKeyBlock, TD_GE); - if (ptr == NULL) { - // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block - // and merge the data - pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1); - } else { - pMergeBlock = (SCompBlock *)ptr; - } - - if (pMergeBlock->last) { - if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) { - // Need to load the data from .last and combine data in pCols to write to .data file +// pCompCol->colId = pDataCol->colId; +// pCompCol->type = pDataCol->type; +// pCompCol->offset = toffset; + +// // TODO: add compression +// pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite; +// toffset += pCompCol->len; +// } + +// // Write the block +// if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; +// *len += size; +// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { +// SDataCol *pDataCol = pCols->cols + iCol; +// SCompCol *pCompCol = pCompData->cols + iCol; +// if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; +// *len += pCompCol->len; +// } + +// if (pCompData == NULL) free((void *)pCompData); +// return 0; - } else { // Just append the block to .last or .l file - if (lFile) { - // read the block from .last file and merge with pCols, write to .l file +// _err: +// if (pCompData == NULL) free((void *)pCompData); +// return -1; +// } - } else { - // tsdbWriteBlockToFileImpl(); - } - } - } else { // The block need to merge in .data file +// static int compareKeyBlock(const void *arg1, const void *arg2) { +// TSKEY key = *(TSKEY *)arg1; +// SCompBlock *pBlock = (SCompBlock *)arg2; - } +// if (key < pBlock->keyFirst) { +// return -1; +// } else if (key > pBlock->keyLast) { +// return 1; +// } - } +// return 0; +// } - return numOfPointsToWrite; -} +// int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { +// STsdbCfg * pCfg = &(pRepo->config); +// SFile * pFile = NULL; +// int numOfPointsToWrite = 0; +// int64_t offset = 0; +// int32_t len = 0; + +// memset((void *)pCompBlock, 0, sizeof(SCompBlock)); + +// if (pCompInfo == NULL) { +// // Just append the data block to .data or .l or .last file +// numOfPointsToWrite = pCols->numOfPoints; +// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file +// pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]); +// } else { // Write to .last or .l file +// pCompBlock->last = 1; +// if (lFile) { +// pFile = lFile; +// } else { +// pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]); +// } +// } +// tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid); +// pCompBlock->offset = offset; +// pCompBlock->len = len; +// pCompBlock->algorithm = 2; // TODO : add to configuration +// pCompBlock->sversion = pCols->sversion; +// pCompBlock->numOfPoints = pCols->numOfPoints; +// pCompBlock->numOfSubBlocks = 1; +// pCompBlock->numOfCols = pCols->numOfCols; +// pCompBlock->keyFirst = dataColsKeyFirst(pCols); +// pCompBlock->keyLast = dataColsKeyLast(pCols); +// } else { +// // Need to merge the block to either the last block or the other block +// TSKEY keyFirst = dataColsKeyFirst(pCols); +// SCompBlock *pMergeBlock = NULL; + +// // Search the block to merge in +// void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks, +// compareKeyBlock, TD_GE); +// if (ptr == NULL) { +// // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block +// // and merge the data +// pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1); +// } else { +// pMergeBlock = (SCompBlock *)ptr; +// } + +// if (pMergeBlock->last) { +// if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) { +// // Need to load the data from .last and combine data in pCols to write to .data file + +// } else { // Just append the block to .last or .l file +// if (lFile) { +// // read the block from .last file and merge with pCols, write to .l file + +// } else { +// // tsdbWriteBlockToFileImpl(); +// } +// } +// } else { // The block need to merge in .data file + +// } + +// } + +// return numOfPointsToWrite; +// } diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c new file mode 100644 index 0000000000..79fe3c49f7 --- /dev/null +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -0,0 +1,1057 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "tsdbMain.h" + +#define adjustMem(ptr, size, expectedSize) \ + do { \ + if ((size) < (expectedSize)) { \ + (ptr) = realloc((void *)(ptr), (expectedSize)); \ + if ((ptr) == NULL) return -1; \ + (size) = (expectedSize); \ + } \ + } while (0) + +// Local function definitions +static int tsdbCheckHelperCfg(SHelperCfg *pCfg); +static void tsdbInitHelperFile(SHelperFile *pHFile); +static int tsdbInitHelperRead(SRWHelper *pHelper); +static int tsdbInitHelperWrite(SRWHelper *pHelper); +static void tsdbClearHelperFile(SHelperFile *pHFile); +static void tsdbDestroyHelperRead(SRWHelper *pHelper); +static void tsdbDestroyHelperWrite(SRWHelper *pHelper); +static void tsdbClearHelperRead(SRWHelper *pHelper); +static void tsdbClearHelperWrite(SRWHelper *pHelper); +static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, + bool isLast); +static int compareKeyBlock(const void *arg1, const void *arg2); +static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); +static int nRowsLEThan(SDataCols *pDataCols, int maxKey); +static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); + +int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg) { + if (pHelper == NULL || pCfg == NULL || tsdbCheckHelperCfg(pCfg) < 0) return -1; + + memset((void *)pHelper, 0, sizeof(*pHelper)); + + pHelper->config = *pCfg; + + tsdbInitHelperFile(&(pHelper->files)); + + if (tsdbInitHelperRead(pHelper) < 0) goto _err; + + if ((TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) && tsdbInitHelperWrite(pHelper) < 0) goto _err; + + pHelper->state = TSDB_HELPER_CLEAR_STATE; + + return 0; + +_err: + tsdbDestroyHelper(pHelper); + return -1; +} + +void tsdbDestroyHelper(SRWHelper *pHelper) { + if (pHelper == NULL) return; + + tsdbClearHelperFile(&(pHelper->files)); + tsdbDestroyHelperRead(pHelper); + tsdbDestroyHelperWrite(pHelper); +} + +void tsdbClearHelper(SRWHelper *pHelper) { + if (pHelper == NULL) return; + tsdbClearHelperFile(&(pHelper->files)); + tsdbClearHelperRead(pHelper); + tsdbClearHelperWrite(pHelper); +} + +int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { + // TODO: reset the helper object + + pHelper->files.fid = pGroup->fileId; + + pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; + pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; + pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; + + if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { + char *fnameDup = strdup(pHelper->files.headF.fname); + if (fnameDup == NULL) return -1; + char *dataDir = dirname(fnameDup); + + tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname); + tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname); + free((void *)fnameDup); + } + return 0; +} + +int tsdbOpenHelperFile(SRWHelper *pHelper) { + // TODO: check if the file is set + {} + + if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { + if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; + // TODO: need to write head and compIdx part + if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) goto _err; + if (tsdbShouldCreateNewLast(pHelper)) { + if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; + } + } else { + if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err; + } + + return 0; + +_err: + tsdbCloseHelperFile(pHelper, true); + return -1; +} + +int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { + if (pHelper->files.headF.fd > 0) { + close(pHelper->files.headF.fd); + pHelper->files.headF.fd = -1; + } + if (pHelper->files.dataF.fd > 0) { + close(pHelper->files.dataF.fd); + pHelper->files.dataF.fd = -1; + } + if (pHelper->files.lastF.fd > 0) { + close(pHelper->files.lastF.fd); + pHelper->files.lastF.fd = -1; + } + if (pHelper->files.nHeadF.fd > 0) { + close(pHelper->files.nHeadF.fd); + pHelper->files.nHeadF.fd = -1; + if (hasError) remove(pHelper->files.nHeadF.fname); + } + + if (pHelper->files.nLastF.fd > 0) { + close(pHelper->files.nLastF.fd); + pHelper->files.nLastF.fd = -1; + if (hasError) remove(pHelper->files.nLastF.fname); + } + return 0; +} + +void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema) { + // TODO: check if it is available to set the table + + pHelper->tableInfo = *pHelperTable; + // TODO: Set the pDataCols according to schema + + // TODO: set state +} + +int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { + ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); + ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET) && helperHasState(pHelper, TSDB_HELPER_FILE_OPEN)); + SCompBlock compBlock; + int rowsToWrite = 0; + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + + // Load SCompIdx part if not loaded yet + if ((!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) && (tsdbLoadCompIdx(pHelper, NULL) < 0)) goto _err; + + // Load the SCompInfo part if neccessary + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + if ((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) { + if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err; + } + + SCompIdx *pWIdx = pHelper->pWCompIdx + pHelper->tableInfo.tid; + + if (!pIdx->hasLast && keyFirst > pIdx->maxKey) { + // Just need to append as a super block + rowsToWrite = pDataCols->numOfPoints; + SFile *pWFile = NULL; + bool isLast = false; + + if (rowsToWrite > pHelper->config.minRowsPerFileBlock) { + pWFile = &(pHelper->files.dataF); + } else { + isLast = true; + pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.nLastF); + } + + if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast) < 0) goto _err; + + // TODO: may need to reallocate the memory + pHelper->pCompInfo->blocks[pHelper->blockIter++] = compBlock; + + pIdx->hasLast = compBlock.last; + pIdx->numOfSuperBlocks++; + pIdx->maxKey = dataColsKeyLast(pDataCols); + // pIdx->len = ?????? + } else { // (pIdx->hasLast) OR (keyFirst <= pIdx->maxKey) + if (keyFirst > pIdx->maxKey) { + int blkIdx = pIdx->numOfSuperBlocks - 1; + ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[blkIdx].last); + + // Need to merge with the last block + if (tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols) < 0) goto _err; + } else { + // Find the first block greater or equal to the block + SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), + pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE); + if (pCompBlock == NULL) { + if (tsdbMergeDataWithBlock(pHelper, pIdx->numOfSuperBlocks-1, pDataCols) < 0) goto _err; + } else { + if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { + SCompBlock *pNextBlock = NULL; + TSKEY keyLimit = (pNextBlock == NULL) ? INT_MAX : (pNextBlock->keyFirst - 1); + rowsToWrite = + MIN(nRowsLEThan(pDataCols, keyLimit), pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints); + + if (tsdbMergeDataWithBlock(pHelper, pCompBlock-pHelper->pCompInfo->blocks, pDataCols) < 0) goto _err; + } else { + // There options: 1. merge with previous block + // 2. commit as one block + // 3. merge with current block + int nRows1 = INT_MAX; + int nRows2 = nRowsLEThan(pDataCols, pCompBlock->keyFirst); + int nRows3 = MIN(nRowsLEThan(pDataCols, (pCompBlock + 1)->keyFirst), (pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints)); + + // TODO: find the block with max rows can merge + if (tsdbMergeDataWithBlock(pHelper, pCompBlock, pDataCols) < 0) goto _err; + } + } + } + } + + return rowsToWrite; + + _err: + return -1; +} + +int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { + // TODO + return 0; +} + +int tsdbWriteCompInfo(SRWHelper *pHelper) { + // TODO + return 0; +} + +int tsdbWriteCompIdx(SRWHelper *pHelper) { + // TODO + return 0; +} + +int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { + // TODO: check helper state + ASSERT(!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)); + + int fd = pHelper->files.headF.fd; + + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + if (tread(fd, pHelper->pCompIdx, pHelper->compIdxSize) < pHelper->compIdxSize) return -1; + // TODO: check the checksum + + if (target) memcpy(target, pHelper->pCompIdx, pHelper->compIdxSize); + helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); + + return 0; +} + +int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + + ASSERT(pIdx->offset > 0); + + int fd = pHelper->files.headF.fd; + + if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1; + ASSERT(pIdx->len > 0); + + adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len); + if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < 0) return -1; + // TODO: check the checksum + + // TODO: think about when target has no space for the content + if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len); + + helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); + + return 0; +} + +int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target) { + // TODO + return 0; +} + +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds) { + // TODO + return 0; +} + +int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { + // TODO + return 0; +} + +static int tsdbCheckHelperCfg(SHelperCfg *pCfg) { + // TODO + return 0; +} + +static void tsdbInitHelperFile(SHelperFile *pHFile) { + pHFile->fid = -1; + pHFile->headF.fd = -1; + pHFile->dataF.fd = -1; + pHFile->lastF.fd = -1; + pHFile->nHeadF.fd = -1; + pHFile->nLastF.fd = -1; +} + +static void tsdbClearHelperFile(SHelperFile *pHFile) { + pHFile->fid = -1; + if (pHFile->headF.fd > 0) { + close(pHFile->headF.fd); + pHFile->headF.fd = -1; + } + if (pHFile->dataF.fd > 0) { + close(pHFile->dataF.fd); + pHFile->dataF.fd = -1; + } + if (pHFile->lastF.fd > 0) { + close(pHFile->lastF.fd); + pHFile->lastF.fd = -1; + } + if (pHFile->nHeadF.fd > 0) { + close(pHFile->nHeadF.fd); + pHFile->nHeadF.fd = -1; + } + if (pHFile->nLastF.fd > 0) { + close(pHFile->nLastF.fd); + pHFile->nLastF.fd = -1; + } + +} + +static int tsdbInitHelperRead(SRWHelper *pHelper) { + SHelperCfg *pCfg = &(pHelper->config); + + pHelper->compIdxSize = pCfg->maxTables * sizeof(SCompIdx); + if ((pHelper->pCompIdx = (SCompIdx *)malloc(pHelper->compIdxSize)) == NULL) return -1; + + return 0; +} + +static void tsdbDestroyHelperRead(SRWHelper *pHelper) { + tfree(pHelper->pCompIdx); + pHelper->compIdxSize = 0; + + tfree(pHelper->pCompInfo); + pHelper->compInfoSize = 0; + + tfree(pHelper->pCompData); + pHelper->compDataSize = 0; + + tdFreeDataCols(pHelper->pDataCols[0]); + tdFreeDataCols(pHelper->pDataCols[1]); +} + +static int tsdbInitHelperWrite(SRWHelper *pHelper) { + SHelperCfg *pCfg = &(pHelper->config); + + pHelper->wCompIdxSize = pCfg->maxTables * sizeof(SCompIdx); + if ((pHelper->pWCompIdx = (SCompIdx *)malloc(pHelper->wCompIdxSize)) == NULL) return -1; + + return 0; +} + +static void tsdbDestroyHelperWrite(SRWHelper *pHelper) { + tfree(pHelper->pWCompIdx); + pHelper->wCompIdxSize = 0; + + tfree(pHelper->pWCompInfo); + pHelper->wCompInfoSize = 0; + + tfree(pHelper->pWCompData); + pHelper->wCompDataSize = 0; +} + +static void tsdbClearHelperRead(SRWHelper *pHelper) { + // TODO +} + +static void tsdbClearHelperWrite(SRWHelper *pHelper) { + // TODO +} + +static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { + // TODO + return 0; +} + +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, + bool isLast) { + ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints); + + int64_t offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) goto _err; + + SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols); + if (pCompData == NULL) goto _err; + + int nColsNotAllNull = 0; + int32_t toffset; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + SDataCol *pDataCol = pDataCols->cols + ncol; + SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; + + if (0) { + // TODO: all data are NULL + continue; + } + + pCompCol->colId = pDataCol->colId; + pCompCol->type = pDataCol->type; + pCompCol->len = pDataCol->len; + pCompCol->offset = toffset; + nColsNotAllNull++; + toffset += pCompCol->len; + } + + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = pHelper->tableInfo.uid; + pCompData->numOfCols = nColsNotAllNull; + + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull; + if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err; + for (int i = 0; i < pDataCols->numOfCols; i++) { + SDataCol *pDataCol = pCompData->cols + i; + SCompCol *pCompCol = NULL; + if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err; + } + + pCompBlock->last = isLast; + pCompBlock->offset = offset; + // pCOmpBlock->algorithm = ; + pCompBlock->numOfPoints = rowsToWrite; + pCompBlock->sversion = pHelper->tableInfo.sversion; + // pCompBlock->len = ; + // pCompBlock->numOfSubBlocks = ; + pCompBlock->numOfCols = nColsNotAllNull; + // pCompBlock->keyFirst = ; + // pCompBlock->keyLast = ; + + return 0; + + _err: + return -1; +} + +// static int compareKeyBlock(const void *arg1, const void *arg2); + +// /** +// * Init a read-write helper object for read or write usage. +// */ +// int tsdbInitHelper(SRWHelper *pHelper, int maxTables, tsdb_rwhelper_t type, int maxRowSize, int maxRows, +// int maxCols) { +// if (pHelper == NULL) return -1; + +// memset((void *)pHelper, 0, sizeof(SRWHelper)); +// for (int ftype = TSDB_RW_HEADF; ftype <= TSDB_RW_LF; ftype++) { +// pHelper->files[ftype] = -1; +// } + +// // Set type +// pHelper->type = type; + +// // Set global configuration +// pHelper->maxTables = maxTables; +// pHelper->maxRowSize = maxRowSize; +// pHelper->maxRows = maxRows; +// pHelper->maxCols = maxCols; + +// // Allocate SCompIdx part memory +// pHelper->compIdxSize = sizeof(SCompIdx) * maxTables; +// pHelper->pCompIdx = (SCompIdx *)malloc(pHelper->compIdxSize); +// if (pHelper->pCompIdx == NULL) goto _err; + +// pHelper->compDataSize = sizeof(SCompData) + sizeof(SCompCol) * maxCols; +// pHelper->pCompData = (SCompData *)malloc(pHelper->compDataSize); + +// pHelper->pDataCols = tdNewDataCols(maxRowSize, maxCols, maxRows); +// if (pHelper->pDataCols == NULL) goto _err; + +// return 0; + +// _err: +// tsdbDestroyHelper(pHelper); +// return -1; +// } + +// void tsdbResetHelper(SRWHelper *pHelper) { +// if (pHelper->headF.fd > 0) { +// close(pHelper->headF.fd); +// pHelper->headF.fd = -1; +// } +// if (pHelper->dataF.fd > 0) { +// close(pHelper->dataF.fd); +// pHelper->dataF.fd = -1; +// } +// if (pHelper->lastF.fd > 0) { +// close(pHelper->lastF.fd); +// pHelper->lastF.fd = -1; +// } +// if (pHelper->hF.fd > 0) { +// close(pHelper->hF.fd); +// pHelper->hF.fd = -1; +// } +// if (pHelper->lF.fd > 0) { +// close(pHelper->lF.fd); +// pHelper->lF.fd = -1; +// } + +// pHelper->state = 0; +// tdResetDataCols(pHelper->pDataCols); +// } + +// int tsdbDestroyHelper(SRWHelper *pHelper) { +// if (pHelper->headF.fd > 0) close(pHelper->headF.fd); +// if (pHelper->dataF.fd > 0) close(pHelper->dataF.fd); +// if (pHelper->lastF.fd > 0) close(pHelper->lastF.fd); +// if (pHelper->hF.fd > 0) close(pHelper->hF.fd); +// if (pHelper->lF.fd > 0) close(pHelper->lF.fd); + +// if (pHelper->pCompIdx) free(pHelper->pCompIdx); +// if (pHelper->pCompInfo) free(pHelper->pCompInfo); +// if (pHelper->pCompData) free(pHelper->pCompData); +// memset((void *)pHelper, 0, sizeof(SRWHelper)); +// return 0; +// } + +// int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { +// if (pHelper->state != 0) return -1; + +// pHelper->fid = pGroup->fileId; + +// pHelper->headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; +// pHelper->headF.fd = -1; +// pHelper->dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; +// pHelper->dataF.fd = -1; +// pHelper->lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; +// pHelper->lastF.fd = -1; + +// if (pHelper->mode == TSDB_WRITE_HELPER) { +// char *fnameCpy = strdup(pHelper->headF.fname); +// if (fnameCpy == NULL) return -1; +// char *dataDir = dirname(fnameCpy); + +// memset((void *)(&pHelper->hF), 0, sizeof(SFile)); +// memset((void *)(&pHelper->lF), 0, sizeof(SFile)); +// pHelper->hF.fd = -1; +// pHelper->lF.fd = -1; + +// tsdbGetFileName(dataDir, pHelper->fid, ".h", pHelper->hF.fname); +// tsdbGetFileName(dataDir, pHelper->fid, ".l", pHelper->lF.fname); +// free((char *)fnameCpy); +// } + +// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_SET); + +// return 0; +// } + +// static int tsdbNeedToCreateNewLastFile() { +// // TODO +// return 0; +// } + +// int tsdbCloseHelperFile(SRWHelper *pHelper, int hasErr) { +// int ret = 0; +// if (pHelper->headF.fd > 0) { +// close(pHelper->headF.fd); +// pHelper->headF.fd = -1; +// } +// if (pHelper->dataF.fd > 0) { +// close(pHelper->dataF.fd); +// pHelper->dataF.fd = -1; +// } +// if (pHelper->lastF.fd > 0) { +// close(pHelper->lastF.fd); +// pHelper->lastF.fd = -1; +// } +// if (pHelper->hF.fd > 0) { +// close(pHelper->hF.fd); +// pHelper->hF.fd = -1; +// if (hasErr) remove(pHelper->hF.fname); +// } +// if (pHelper->lF.fd > 0) { +// close(pHelper->lF.fd); +// pHelper->lF.fd = -1; +// if (hasErr) remove(pHelper->hF.fname); +// } +// return 0; +// } + +// int tsdbOpenHelperFile(SRWHelper *pHelper) { +// if (pHelper->state != TSDB_RWHELPER_FILE_SET) return -1; + +// if (pHelper->mode == TSDB_READ_HELPER) { // The read helper +// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err; +// if (tsdbOpenFile(&pHelper->dataF, O_RDONLY) < 0) goto _err; +// if (tsdbOpenFile(&pHelper->lastF, O_RDONLY) < 0) goto _err; +// } else { +// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err; +// if (tsdbOpenFile(&pHelper->dataF, O_RDWR) < 0) goto _err; +// if (tsdbOpenFile(&pHelper->lastF, O_RDWR) < 0) goto _err; +// // Open .h and .l file +// if (tsdbOpenFile(&pHelper->hF, O_WRONLY | O_CREAT) < 0) goto _err; +// if (tsdbNeedToCreateNewLastFile()) { +// if (tsdbOpenFile(&pHelper->lF, O_WRONLY | O_CREAT) < 0) goto _err; +// } +// } + +// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_OPENED); + +// return 0; +// _err: +// tsdbCloseHelperFile(pHelper, 1); +// return -1; +// } + +// int tsdbLoadCompIdx(SRWHelper *pHelper) { +// if (pHelper->state != (TSDB_RWHELPER_FILE_SET | TSDB_RWHELPER_FILE_OPENED)) return -1; + +// if (lseek(pHelper->headF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; +// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1; + +// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPIDX_LOADED); + +// return 0; +// } + +// int tsdbSetHelperTable(SRWHelper *pHelper, int32_t tid, int64_t uid, STSchema *pSchema) { +// // TODO: add some check information +// pHelper->tid = tid; +// pHelper->uid = uid; + +// tdInitDataCols(pHelper->pDataCols, pSchema); + +// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_TABLE_SET); + +// return 0; +// } + +// int tsdbLoadCompBlocks(SRWHelper *pHelper) { +// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; +// if (pIdx->offset <= 0) return 0; + +// if (lseek(pHelper->headF.fd, pIdx->offset, SEEK_SET) < 0) return -1; +// if (pHelper->compInfoSize < pIdx->len) { +// pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), pIdx->len); +// if (pHelper->pCompInfo == NULL) return -1; +// pHelper->compInfoSize = pIdx->len; +// } + +// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; + +// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPBLOCK_LOADED); + +// return 0; +// } + +// int tsdbRWHelperSetBlockIdx(SRWHelper *pHelper, int blkIdx) { +// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; +// if (blkIdx > pIdx->numOfSuperBlocks) return -1; + +// pHelper->blkIdx = blkIdx; + +// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_BLOCKIDX_SET); + +// return 0; +// } + +// int tsdbRWHelperLoadCompData(SRWHelper *pHelper) { +// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx; + +// if (pBlock->numOfSubBlocks == 1) { // Only one super block +// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; +// if (size > pHelper->compDataSize) { +// pHelper->pCompData = (SCompData *)realloc((void *)pHelper->pCompData, size); +// if (pHelper->pCompData == NULL) return -1; +// pHelper->compDataSize = size; +// } + +// if (lseek(pHelper->dataF.fd, pBlock->offset, SEEK_SET) < 0) return -1; +// if (tread(pHelper->dataF.fd, (void *)(pHelper->pCompData), size) < size) return -1; +// } else { // TODO: More sub blocks +// } + +// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPCOL_LOADED); + +// return 0; +// } + + +// static int compColIdCompCol(const void *arg1, const void *arg2) { +// int colId = *(int *)arg1; +// SCompCol *pCompCol = (SCompCol *)arg2; + +// return (int)(colId - pCompCol->colId); +// } + +// static int compColIdDataCol(const void *arg1, const void *arg2) { +// int colId = *(int *)arg1; +// SDataCol *pDataCol = (SDataCol *)arg2; + +// return (int)(colId - pDataCol->colId); +// } + +// int tsdbRWHelperLoadColData(SRWHelper *pHelper, int colId) { +// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx; + +// if (pBlock->numOfSubBlocks == 1) { // only one super block +// SCompCol *pCompCol = bsearch((void *)(&colId), (void *)(pHelper->pCompData->cols), pBlock->numOfCols, compColIdCompCol, compColIdCompCol); +// if (pCompCol == NULL) return 0; // No data to read from this block , but we still return 0 + +// SDataCol *pDataCol = bsearch((void *)(&colId), (void *)(pHelper->pDataCols->cols), pHelper->pDataCols->numOfCols, sizeof(SDataCol), compColIdDataCol); +// assert(pDataCol != NULL); + +// int fd = (pBlock->last) ? pHelper->lastF.fd : pHelper->dataF.fd; +// if (lseek(fd, pBlock->offset + pCompCol->offset, SEEK_SET) < 0) return -1; +// if (tread(fd, (void *)pDataCol->pData, pCompCol->len) < pCompCol->len) return -1; +// pDataCol->len = pCompCol->len; +// } else { +// // TODO: more than 1 blocks +// } +// return 0; +// } + +// int tsdbRWHelperLoadBlockData(SRWHelper *pHelper, int blkIdx) { +// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx; + +// if (pBlock->numOfSubBlocks == 1) { +// for (int i = 0; i < pHelper->pDataCols->numOfCols; i++) { +// if (tsdbRWHelperLoadBlockData(pHelper, pHelper->pDataCols->cols[i].colId) < 0) return -1; +// } +// } else { +// // TODO: more than 1 block of data +// } +// return 0; +// } + +// int tsdbRWHelperCopyCompBlockPart(SRWHelper *pHelper) { +// // TODO +// return 0; +// } + +// int tsdbRWHelperCopyDataBlockPart(SRWHelper *pHelper ) { +// // TODO +// return 0; +// } + +// int tsdbRWHelperWriteCompIdx(SRWHelper *pHelper) { +// // TODO +// if (lseek(pHelper->hF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; +// if (twrite(pHelper->hF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1; + +// return 0; +// } + +// /** +// * Load the data block from file +// * +// * @return 0 for success +// * -1 for failure +// */ +// int tsdbLoadDataBlock(SRWHelper *pHelper, int bldIdx) { +// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; +// if (pIdx->) + +// return 0; +// } + +// /** +// * Append the block to a file, either .data +// */ +// int tsdbAppendBlockToFile(SRWHelper *pHelper, tsdb_rw_file_t toFile, SDataCols *pDataCols, SCompBlock *pCompBlock, bool isSuper) { +// SFile *pFile = pHelper->files + toFile; + +// int64_t offset = lseek(pFile->fd, 0, SEEK_END); +// if (*offset < 0) return -1; + +// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols); +// if (pCompData == NULL) return -1; + + +// int numOfNotAllNullCols = 0; +// int32_t toffset = 0; +// for (int i = 0; i < pDataCols->numOfCols; i++) { +// SDataCol *pDataCol = pDataCols->cols + i; +// SCompCol *pCompCol = pCompData->cols + numOfNotAllNullCols; + +// if (0 /* All data in this column are NULL value */) { +// continue; +// } +// pCompCol->colId = pDataCol->colId; +// pCompCol->type = pDataCol->type; +// pCompCol->len = pDataCol->len; +// // pCompCol->offset = toffset; +// numOfNotAllNullCols++; +// // toffset += pDataCol->len; +// } + +// pCompData->delimiter = TSDB_FILE_DELIMITER; +// pCompData->numOfCols = numOfNotAllNullCols; +// pCompData->uid = pHelper->uid; + +// size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * numOfNotAllNullCols; +// if (twrite(pFile->fd, (void *)pCompData, tsize) < 0) return -1; +// for (int i = 0; i < numOfNotAllNullCols; i++) { +// SCompCol *pCompCol = pCompData->cols + i; +// SDataCol *pDataCol = NULL; // bsearch() +// tassert(pDataCol != NULL); +// if (twrite(pFile->fd, (void *)(pDataCol->pData), pDataCol->len) < pDataCol->len) return -1; +// } + +// pCompBlock->last = (toFile == TSDB_RW_DATAF) ? 0 : 1; +// pCompBlock->offset = offset; +// pCompBlock->algorithm = pHelper->compression; +// pCompBlock->numOfPoints = pDataCols->numOfPoints; +// pCompBlock->sversion = pHelper->sversion; +// // pCompBlock->len = ; +// pCompBlock->numOfSubBlocks = isSuper ? 1 : 0; +// pCompBlock->numOfCols = numOfNotAllNullCols; +// pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); +// pCompBlock->keyLast = dataColsKeyLast(pDataCols); + +// return 0; +// } + +// /** +// * Write the whole or part of the cached data block to file. +// * +// * There are four options: +// * 1. Append the whole block as a SUPER-BLOCK at the end +// * 2. Append part/whole block as a SUPER-BLOCK and insert in the middle +// * 3. Append part/whole block as a SUB-BLOCK +// * 4. Merge part/whole block as a SUPER-BLOCK +// */ +// int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { +// tassert(pHelper->type == TSDB_WRITE_HELPER); + +// int rowsWritten = 0; +// SCompBlock compBlock; + +// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; +// // if ((no old data) OR (no last block AND cached first key is larger than the max key)) +// if ((pIdx->offset == 0) || (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey)) { +// // Append the whole block as a SUPER-BLOCK at the end +// if (pDataCols->numOfPoints >= pHelper->minRowPerFileBlock) { +// if (tsdbAppendBlockToFile(pHelper, TSDB_RW_DATAF, pDataCols, &compBlock, true) < 0) goto _err; +// } else { +// tsdb_rw_file_t ftype = (pHelper->files[TSDB_RW_LF].fd > 0) ? TSDB_RW_LF : TSDB_RW_LASTF; +// if (tsdbAppendBlockToFile(pHelper, ftype, pDataCols, &compBlock, true) < 0) goto _err; +// } +// // Copy the compBlock part to the end +// if (IS_COMPBLOCK_LOADED(pHelper)) { + +// } else { + +// } + +// pIdx->hasLast = compBlock.last; +// pIdx->len += sizeof(compBlock); +// pIdx->numOfSuperBlocks++; +// pIdx->maxKey = compBlock.keyLast; + +// rowsWritten = pDataCols->numOfPoints; +// } else { +// // Need to find a block to merge with +// int blkIdx = 0; +// // if (has last block AND cached Key is larger than the max Key) +// if (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey) { +// blkIdx = pIdx->numOfSuperBlocks - 1; +// rowsWritten = tsdbMergeDataWithBlock(pHelper, pDataCols, blkIdx); +// if (rowsWritten < 0) goto _err; +// } else { +// ASSERT(IS_COMPBLOCK_LOADED(pHelper)); +// // SCompBlock *pMergeBlock = taosbsearch(); +// } +// } + +// return numOfPointsWritten; + +// _err: +// return -1; +// } + +static int compareKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SCompBlock *pBlock = (SCompBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } + + return 0; +} + +static int nRowsLEThan(SDataCols *pDataCols, int maxKey) { + return 0; +} + +static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { + int rowsWritten = 0; + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + SCompBlock compBlock = {0}; + + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + ASSERT(blkIdx < pIdx->numOfSuperBlocks); + + SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + ASSERT(pCompBlock->numOfSubBlocks >= 1); + + int rowsCanMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols); + if (rowsCanMerge < 0) goto _err; + + ASSERT(rowsCanMerge > 0); + + if (pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS && + ((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0 && + pCompBlock->numOfPoints + rowsCanMerge < pHelper->config.minRowsPerFileBlock))) { + + SFile *pFile = NULL; + + if (!pCompBlock->last) { + pFile = &(pHelper->files.dataF); + } else { + pFile = &(pHelper->files.lastF); + } + + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last) < 0) goto _err; + + // TODO: Add the sub-block + if (pCompBlock->numOfSubBlocks == 1) { + pCompBlock->numOfSubBlocks += 2; + // pCompBlock->offset = ; + // pCompBlock->len = ; + } else { + pCompBlock->numOfSubBlocks++; + } + pCompBlock->numOfPoints += rowsCanMerge; + pCompBlock->keyFirst = MIN(pCompBlock->keyFirst, dataColsKeyFirst(pDataCols)); + pCompBlock->keyLast = MAX(pCompBlock->keyLast, dataColsKeyAt(pDataCols, rowsCanMerge - 1)); + + // Update the Idx + // pIdx->hasLast = ; + // pIdx->len =; + // pIdx->numOfSuperBlocks = ; + + rowsWritten = rowsCanMerge; + } else { + // Read-Merge-Write as a super block + if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsCanMerge); + + int isLast = 0; + SFile *pFile = NULL; + if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.minRowsPerFileBlock)) { + pFile = &(pHelper->files.dataF); + } else { + isLast = 1; + if (pHelper->files.nLastF.fd > 0) { + pFile = &(pHelper->files.nLastF); + } else { + pFile = &(pHelper->files.lastF); + } + } + + if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast) < 0) goto _err; + + *pCompBlock = compBlock; + + pIdx->maxKey = MAX(pIdx->maxKey, compBlock.keyLast); + // pIdx->hasLast = ; + // pIdx-> + } + + return rowsWritten; + + _err: + return -1; +} + +static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); } + +// Get the number of rows the data can be merged into the block +static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { + int rowsCanMerge = 0; + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + + SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + ASSERT(blkIdx < pIdx->numOfSuperBlocks); + + TSKEY keyMax = (blkIdx < pIdx->numOfSuperBlocks + 1) ? (pCompBlock + 1)->keyFirst - 1 : pHelper->files.maxKey; + + if (keyFirst > pCompBlock->keyLast) { + void *ptr = taosbsearch((void *)(&keyMax), pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), + compTSKEY, TD_LE); + ASSERT(ptr != NULL); + + rowsCanMerge = + MIN((TSKEY *)ptr - (TSKEY *)pDataCols->cols[0].pData, pHelper->config.minRowsPerFileBlock - pCompBlock->numOfPoints); + + } else { + int32_t colId[1] = {0}; + if (tsdbLoadBlockDataCols(pHelper, NULL, &colId, 1) < 0) goto _err; + + int iter1 = 0; // For pDataCols + int iter2 = 0; // For loaded data cols + + while (1) { + if (iter1 >= pDataCols->numOfPoints || iter2 >= pHelper->pDataCols[0]->numOfPoints) break; + if (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.maxRowsPerFileBlock) break; + + TSKEY key1 = dataColsKeyAt(pDataCols, iter1); + TSKEY key2 = dataColsKeyAt(pHelper->pDataCols[0], iter2); + + if (key1 > keyMax) break; + + if (key1 < key2) { + iter1++; + } else if (key1 == key2) { + iter1++; + iter2++; + } else { + iter2++; + rowsCanMerge++; + } + } + } + + return rowsCanMerge; + +_err: + return -1; +} \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 30d0e94950..ed12299050 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -369,14 +369,14 @@ static int32_t getFileCompInfo(STableCheckInfo* pCheckInfo, SFileGroup* fileGrou fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY); } - tsdbLoadCompIdx(fileGroup, pCheckInfo->compIndex, 10000); // todo set dynamic max tables - SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid]; + // tsdbLoadCompIdx(fileGroup, pCheckInfo->compIndex, 10000); // todo set dynamic max tables + // SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid]; - if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file + // if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file - } else { - tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); - } + // } else { + // tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); + // } return TSDB_CODE_SUCCESS; } @@ -444,7 +444,7 @@ static bool doLoadDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { pFile->fd = open(pFile->fname, O_RDONLY); } - tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data); + // tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data); return true; } @@ -810,10 +810,10 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf pFile->fd = open(pFile->fname, O_RDONLY); } - if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1, - pCheckInfo->pDataCols, data) == 0) { - blockLoaded = true; - } + // if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1, + // pCheckInfo->pDataCols, data) == 0) { + // blockLoaded = true; + // } // dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files", // GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 9ee49d6a70..77f944cea2 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -54,7 +54,8 @@ TEST(TsdbTest, createRepo) { // 1. Create a tsdb repository tsdbSetDefaultCfg(&config); - tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL); + tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL); + tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); ASSERT_NE(pRepo, nullptr); // 2. Create a normal table @@ -139,42 +140,42 @@ TEST(TsdbTest, createRepo) { } // TEST(TsdbTest, DISABLED_openRepo) { -TEST(TsdbTest, openRepo) { - tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); - ASSERT_NE(repo, nullptr); +// TEST(TsdbTest, openRepo) { +// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); +// ASSERT_NE(repo, nullptr); - STsdbRepo *pRepo = (STsdbRepo *)repo; +// STsdbRepo *pRepo = (STsdbRepo *)repo; - SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1833); +// SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1833); - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbOpenFile(&pGroup->files[type], O_RDONLY); - } +// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { +// tsdbOpenFile(&pGroup->files[type], O_RDONLY); +// } - SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx)); - tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables); +// SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx)); +// tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables); - SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len); +// SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len); - tsdbLoadCompBlocks(pGroup, &pIdx[0], (void *)pCompInfo); +// tsdbLoadCompBlocks(pGroup, &pIdx[0], (void *)pCompInfo); - int blockIdx = 0; - SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]); +// int blockIdx = 0; +// SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]); - SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols); +// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols); - tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); +// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); - STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); - SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(pTable->schema), 5, 10); - tdInitDataCols(pDataCols, pTable->schema); +// STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); +// SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(pTable->schema), 5, 10); +// tdInitDataCols(pDataCols, pTable->schema); - tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); +// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); - int k = 0; +// int k = 0; -} +// } TEST(TsdbTest, DISABLED_createFileGroup) { SFileGroup fGroup; -- GitLab