diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 04fa7dcc7dd5e1cb8498b4fa4eb033dfcc92e7c3..17aa19cce7be5d5a35707ef68cb297c7ba6889a0 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -111,7 +111,6 @@ typedef struct SDataCol { int len; int offset; void * pData; // Original data - void * pCData; // Compressed data } SDataCol; typedef struct { @@ -133,9 +132,12 @@ typedef struct { SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); +SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); +int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 45850d1788d55cf25796d0d51c9518db38bf126a..fb20892452641e932dbbcf1d6ea9902efa091b2a 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "dataformat.h" +#include "tutil.h" static int tdFLenFromSchema(STSchema *pSchema); @@ -338,6 +339,28 @@ void tdFreeDataCols(SDataCols *pCols) { } } +SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { + SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints); + if (pRet == NULL) return NULL; + + pRet->numOfCols = pDataCols->numOfCols; + pRet->sversion = pDataCols->sversion; + if (keepData) pRet->numOfPoints = pDataCols->numOfPoints; + + for (int i = 0; i < pDataCols->numOfCols; i++) { + pRet->cols[i].type = pDataCols->cols[i].type; + pRet->cols[i].colId = pDataCols->cols[i].colId; + pRet->cols[i].bytes = pDataCols->cols[i].bytes; + pRet->cols[i].len = pDataCols->cols[i].len; + pRet->cols[i].offset = pDataCols->cols[i].offset; + pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); + + if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].bytes * pDataCols->numOfPoints); + } + + return pRet; +} + void tdResetDataCols(SDataCols *pCols) { pCols->numOfPoints = 0; for (int i = 0; i < pCols->maxCols; i++) { @@ -382,6 +405,58 @@ static int tdFLenFromSchema(STSchema *pSchema) { return ret; } -int tdMergeDataCols(SDataCols *target, SDataCols *source) { +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { + ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); + + SDataCols *pTarget = tdDupDataCols(target, true); + if (pTarget == NULL) goto _err; + // tdResetDataCols(target); + + int iter1 = 0; + int iter2 = 0; + tdMergeTwoDataCols(target,pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); + + tdFreeDataCols(pTarget); return 0; + +_err: + tdFreeDataCols(pTarget); + return -1; +} + +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) { + tdResetDataCols(target); + + while (target->numOfPoints < tRows) { + if (*iter1 >= src1->numOfPoints && *iter2 >= src2->numOfPoints) break; + + TSKEY key1 = (*iter1 >= src1->numOfPoints) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; + TSKEY key2 = (*iter2 >= src2->numOfPoints) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; + + if (key1 < key2) { + for (int i = 0; i < src1->numOfCols; i++) { + ASSERT(target->cols[i].type == src1->cols[i].type); + memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), + (void *)((char *)(src1->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * (*iter1)), + TYPE_BYTES[target->cols[i].type]); + target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + } + + target->numOfPoints++; + (*iter1)++; + } else if (key1 > key2) { + for (int i = 0; i < src2->numOfCols; i++) { + ASSERT(target->cols[i].type == src2->cols[i].type); + memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), + (void *)((char *)(src2->cols[i].pData) + TYPE_BYTES[src2->cols[i].type] * (*iter2)), + TYPE_BYTES[target->cols[i].type]); + target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + } + + target->numOfPoints++; + (*iter2)++; + } else { + ASSERT(false); + } + } } \ No newline at end of file diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index cc1e4daf12232d6d9d6954f0f5c3551d535c5fc3..62f9605b6aa81623052bd2de59c6ef2dafe73f77 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -46,6 +46,7 @@ typedef struct { // --------- TSDB REPOSITORY CONFIGURATION DEFINITION typedef struct { int8_t precision; + int8_t compression; int32_t tsdbId; int32_t maxTables; // maximum number of tables this repository can have int32_t daysPerFile; // day per file sharding policy diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 37963a33226813816893c7a855c74b63325ddf6e..756a206af0f4303207522120fe788c84bd598f8f 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -220,11 +220,12 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); void tsdbCloseFileH(STsdbFileH *pFileH); int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); -int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); +SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); int tsdbOpenFile(SFile *pFile, int oflag); int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); +int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname); #define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC @@ -270,6 +271,8 @@ typedef struct { TSKEY keyLast; } SCompBlock; +// Maximum number of sub-blocks a super-block can have +#define TSDB_MAX_SUBBLOCKS 8 #define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1) #define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) @@ -309,17 +312,9 @@ typedef struct { STsdbFileH *tsdbGetFile(tsdb_repo_t *pRepo); -int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, - SDataCols *pCols); - -int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); -int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); -int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); -int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); -int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); - +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, + SDataCols *pCols); SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); - void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); // TSDB repository definition @@ -379,6 +374,100 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo); int32_t tsdbLockRepo(tsdb_repo_t *repo); int32_t tsdbUnLockRepo(tsdb_repo_t *repo); +typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; + +typedef struct { + tsdb_rw_helper_t type; // helper type + + int maxTables; + int maxRowSize; + int maxRows; + int maxCols; + int minRowsPerFileBlock; + int maxRowsPerFileBlock; + int8_t compress; +} SHelperCfg; + +typedef struct { + int fid; + TSKEY minKey; + TSKEY maxKey; + // For read/write purpose + SFile headF; + SFile dataF; + SFile lastF; + // For write purpose only + SFile nHeadF; + SFile nLastF; +} SHelperFile; + +typedef struct { + int64_t uid; + int32_t tid; + int32_t sversion; +} SHelperTable; + +typedef struct { + // Global configuration + SHelperCfg config; + + int8_t state; + + // For file set usage + SHelperFile files; + SCompIdx * pCompIdx; + + // For table set usage + SHelperTable tableInfo; + SCompInfo * pCompInfo; + bool hasOldLastBlock; + + // For block set usage + SCompData *pCompData; + SDataCols *pDataCols[2]; + +} SRWHelper; + +// --------- Helper state +#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state +#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set +#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded +#define TSDB_HELPER_TABLE_SET 0x4 // Table is set +#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded +#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded + +#define TSDB_HELPER_TYPE(h) ((h)->config.type) + +#define helperSetState(h, s) (((h)->state) |= (s)) +#define helperClearState(h, s) ((h)->state &= (~(s))) +#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) +#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) + +int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo); +int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo); +// int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg); +void tsdbDestroyHelper(SRWHelper *pHelper); +void tsdbResetHelper(SRWHelper *pHelper); + +// --------- For set operations +int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup); +// void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema); +void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo); +int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError); + +// --------- For read operations +int tsdbLoadCompIdx(SRWHelper *pHelper, void *target); +int tsdbLoadCompInfo(SRWHelper *pHelper, void *target); +int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target); +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds); +int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target); + +// --------- For write operations +int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols); +int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper); +int tsdbWriteCompInfo(SRWHelper *pHelper); +int tsdbWriteCompIdx(SRWHelper *pHelper); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 7576717dd1a9eeef30d3ef9aa7eaee6373d74fff..ab76f69bedf1d289be57f7b2539ff1b9fbcac466 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -21,10 +21,12 @@ #include #include #include +#include +#include "talgo.h" +#include "tchecksum.h" #include "tsdbMain.h" #include "tutil.h" -#include "talgo.h" const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD @@ -34,7 +36,6 @@ const char *tsdbFileSuffix[] = { static int compFGroupKey(const void *key, const void *fgroup); static int compFGroup(const void *arg1, const void *arg2); -static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); @@ -93,24 +94,36 @@ static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { return 0; } -int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { - if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; +/** + * Create the file group if the file group not exists. + * + * @return A pointer to + */ +SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { + if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL; SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one + + SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); + if (pGroup == NULL) { // if not exists, create one pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) { - // TODO: deal with the ERROR here, remove those creaed file - return -1; - } + if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), + type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) + goto _err; } pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); + return tsdbSearchFGroup(pFileH, fid); } - return 0; + + return pGroup; + +_err: + // TODO: deal with the err here + return NULL; } int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { @@ -183,27 +196,27 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { return ret; } -int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { - SCompBlock *pBlock = pStartBlock; - for (int i = 0; i < numOfBlocks; i++) { - if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; - pCols->numOfPoints += (pCompData->cols[0].len / 8); - for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { - SCompCol *pCompCol = &(pCompData->cols[iCol]); - // pCols->numOfPoints += pBlock->numOfPoints; - int k = 0; - for (; k < pCols->numOfCols; k++) { - if (pCompCol->colId == pCols->cols[k].colId) break; - } - - if (tsdbLoadColData(pFile, pCompCol, pBlock->offset, - (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0) - return -1; - } - pStartBlock++; - } - return 0; -} +// int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { +// SCompBlock *pBlock = pStartBlock; +// for (int i = 0; i < numOfBlocks; i++) { +// if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; +// pCols->numOfPoints += (pCompData->cols[0].len / 8); +// for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { +// SCompCol *pCompCol = &(pCompData->cols[iCol]); +// // pCols->numOfPoints += pBlock->numOfPoints; +// int k = 0; +// for (; k < pCols->numOfCols; k++) { +// if (pCompCol->colId == pCols->cols[k].colId) break; +// } + +// if (tsdbLoadColData(pFile, pCompCol, pBlock->offset, +// (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0) +// return -1; +// } +// pStartBlock++; +// } +// return 0; +// } int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) { SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx); @@ -239,42 +252,42 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf return 0; } -int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { - SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); - if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; +// int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) { +// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); +// if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; - if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1; - // TODO: need to check the correctness - return 0; -} +// if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1; +// // TODO: need to check the correctness +// return 0; +// } -int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { - SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); +// int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) { +// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]); - if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1; +// if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1; - if (read(pFile->fd, buf, pIdx->len) < 0) return -1; +// if (read(pFile->fd, buf, pIdx->len) < 0) return -1; - // TODO: need to check the correctness +// // TODO: need to check the correctness - return 0; -} +// return 0; +// } -int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { - // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); +// int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) { +// // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); - if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; - size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; - if (read(pFile->fd, buf, size) < 0) return -1; +// if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1; +// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; +// if (read(pFile->fd, buf, size) < 0) return -1; - return 0; -} +// return 0; +// } -int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { - if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; - if (read(pFile->fd, buf, pCol->len) < 0) return -1; - return 0; -} +// int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) { +// if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1; +// if (read(pFile->fd, buf, pCol->len) < 0) return -1; +// return 0; +// } static int compFGroupKey(const void *key, const void *fgroup) { int fid = *(int *)key; @@ -299,7 +312,7 @@ static int tsdbWriteFileHead(SFile *pFile) { } static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { - int size = sizeof(SCompIdx) * maxTables; + int size = sizeof(SCompIdx) * maxTables + sizeof(TSCKSUM); void *buf = calloc(1, size); if (buf == NULL) return -1; @@ -308,6 +321,8 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return -1; } + taosCalcChecksumAppend(0, (uint8_t *)buf, size); + if (write(pFile->fd, buf, size) < 0) { free(buf); return -1; @@ -319,7 +334,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return 0; } -static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { +int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { if (dataDir == NULL || fname == NULL) return -1; sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 9c0050b38fc2f331ff7ca754cc6d86656b4166d5..7375a35796161251bf0f7c07e64920fe75205396 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -12,15 +12,16 @@ #include #include -// #include "taosdef.h" -// #include "disk.h" #include "os.h" #include "talgo.h" #include "tsdb.h" #include "tsdbMain.h" +#include "tscompression.h" #define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision #define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO)) +#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP +#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) #define TSDB_MIN_ID 0 #define TSDB_MAX_ID INT_MAX #define TSDB_MIN_TABLES 10 @@ -57,11 +58,12 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); -static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, + SDataCols *pDataCols); +static TSKEY tsdbNextIterKey(SSkipListIterator *pIter); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); -static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, - int64_t uid); +// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, +// int64_t uid); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -82,6 +84,7 @@ void tsdbSetDefaultCfg(STsdbCfg *pCfg) { pCfg->maxRowsPerFileBlock = -1; pCfg->keep = -1; pCfg->maxCacheSize = -1; + pCfg->compression = TWO_STAGE_COMP; } /** @@ -397,6 +400,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t ti config->superUid = TSDB_INVALID_SUPER_TABLE_ID; config->tableId.uid = uid; config->tableId.tid = tid; + config->name = strdup("test1"); return 0; } @@ -571,6 +575,13 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { if (!IS_VALID_PRECISION(pCfg->precision)) return -1; } + // Check compression + if (pCfg->compression == -1) { + pCfg->compression = TSDB_DEFAULT_COMPRESSION; + } else { + if (!IS_VALID_COMPRESSION(pCfg->compression)) return -1; + } + // Check tsdbId if (pCfg->tsdbId < 0) return -1; @@ -785,6 +796,9 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { + ASSERT(maxRowsToRead > 0); + if (pIter == NULL) return 0; + int numOfRows = 0; do { @@ -823,19 +837,16 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) if (pTable == NULL || pTable->imem == NULL) continue; iters[tid] = tSkipListCreateIter(pTable->imem->pData); - if (iters[tid] == NULL) { - tsdbDestroyTableIters(iters, maxTables); - return NULL; - } + if (iters[tid] == NULL) goto _err; - if (!tSkipListIterNext(iters[tid])) { - // No data in this iterator - tSkipListDestroyIter(iters[tid]); - iters[tid] = NULL; - } + if (!tSkipListIterNext(iters[tid])) goto _err; } return iters; + + _err: + tsdbDestroyTableIters(iters, maxTables); + return NULL; } static void tsdbFreeMemTable(SMemTable *pMemTable) { @@ -847,14 +858,17 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { // Commit to file static void *tsdbCommitData(void *arg) { - // TODO printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCache *pCache = pRepo->tsdbCache; - STsdbCfg * pCfg = &(pRepo->config); + STsdbCfg * pCfg = &(pRepo->config); + SDataCols * pDataCols = NULL; + SRWHelper whelper = {0}; if (pCache->imem == NULL) return NULL; + pRepo->appH.walCallBack(pRepo->appH.appH); + // Create the iterator to read from cache SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); if (iters == NULL) { @@ -862,23 +876,21 @@ static void *tsdbCommitData(void *arg) { return NULL; } - // Create a data column buffer for commit - SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); - if (pDataCols == NULL) { - // TODO: deal with the error - return NULL; - } + if (tsdbInitWriteHelper(&whelper, pRepo) < 0) goto _exit; + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit; int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) { - // TODO: deal with the error here - // assert(0); + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + ASSERT(false); + goto _exit; } } +_exit: tdFreeDataCols(pDataCols); tsdbDestroyTableIters(iters, pCfg->maxTables); @@ -888,7 +900,6 @@ static void *tsdbCommitData(void *arg) { free(pCache->imem); pCache->imem = NULL; pRepo->commit = 0; - // TODO: free the skiplist for (int i = 0; i < pCfg->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->imem) { @@ -901,19 +912,12 @@ static void *tsdbCommitData(void *arg) { return NULL; } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { - int isNewLastFile = 0; +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, SDataCols *pDataCols) { STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbCfg * pCfg = &pRepo->config; - SFile hFile, lFile; SFileGroup *pGroup = NULL; - SCompIdx * pIndices = NULL; - SCompInfo * pCompInfo = NULL; - // size_t compInfoSize = 0; - // SCompBlock compBlock; - // SCompBlock *pBlock = &compBlock; TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); @@ -922,334 +926,93 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); if (!hasDataToCommit) return 0; // No data to commit, just return - // TODO: make it more flexible - pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000); - // Create and open files for commit tsdbGetDataDirName(pRepo, dataDir); - if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ - } - pGroup = tsdbOpenFilesForCommit(pFileH, fid); - if (pGroup == NULL) { /* TODO */ - } - tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 1); - tsdbOpenFile(&hFile, O_RDWR); - if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { - // TODO: make it not to write the last file every time - tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); - isNewLastFile = 1; - } + if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err; - // Load the SCompIdx - pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - if (pIndices == NULL) { /* TODO*/ - } - if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ - } - - lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + // Open files for write/read + if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) goto _err; // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { STable * pTable = pMeta->tables[tid]; - SSkipListIterator *pIter = iters[tid]; - SCompIdx * pIdx = &pIndices[tid]; - - int nNewBlocks = 0; - - if (pTable == NULL || pIter == NULL) continue; - - /* If no new data to write for this table, just write the old data to new file - * if there are. - */ - if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { - // has old data - if (pIdx->len > 0) { - goto _table_over; - // if (isNewLastFile && pIdx->hasLast) { - if (0) { - // need to move the last block to new file - if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ - } - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ - } - - tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable)); - - SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); - int nBlocks = 0; - - TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); - - SCompData tBlock; - int64_t toffset; - int32_t tlen; - tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); - - tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid); - pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); - pTBlock->offset = toffset; - pTBlock->len = tlen; - pTBlock->numOfPoints = pCols->numOfPoints; - pTBlock->numOfSubBlocks = 1; - - pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); - if (nBlocks > 1) { - pIdx->len -= (sizeof(SCompBlock) * nBlocks); - } - write(hFile.fd, (void *)pCompInfo, pIdx->len); - } else { - pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); - sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); - hFile.info.size += pIdx->len; - } - } - continue; - } + if (pTable == NULL) continue; - pCompInfo->delimiter = TSDB_FILE_DELIMITER; - pCompInfo->checksum = 0; - pCompInfo->uid = pTable->tableId.uid; - - // Load SCompBlock part if neccessary - // int isCompBlockLoaded = 0; - if (0) { - // if (pIdx->offset > 0) { - if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { - // has last block || cache key overlap with commit key - pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ - } - // if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; - } else { - // TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part - // and write those new blocks to it - } - } + SSkipListIterator *pIter = iters[tid]; - tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable)); + // Set the helper and the buffer dataCols object to help to write this table + tsdbSetHelperTable(pHelper, pTable, pRepo); + tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable)); + // Loop to write the data in the cache to files. If no data to write, just break the loop int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; - while (1) { - tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); - if (pCols->numOfPoints == 0) break; - - int pointsWritten = pCols->numOfPoints; - // TODO: all write to the end of .data file - int64_t toffset = 0; - int32_t tlen = 0; - tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid); - - // Make the compBlock - SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++; - pTBlock->offset = toffset; - pTBlock->len = tlen; - pTBlock->keyFirst = dataColsKeyFirst(pCols); - pTBlock->keyLast = dataColsKeyLast(pCols); - pTBlock->last = 0; - pTBlock->algorithm = 0; - pTBlock->numOfPoints = pCols->numOfPoints; - pTBlock->sversion = pTable->sversion; - pTBlock->numOfSubBlocks = 1; - pTBlock->numOfCols = pCols->numOfCols; - - if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols); - - tdPopDataColsPoints(pCols, pointsWritten); - maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; + int nLoop = 0; + while (true) { + int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols); + assert(rowsRead >= 0); + if (pDataCols->numOfPoints == 0) break; + nLoop++; + + ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); + ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); + + int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); + ASSERT(rowsWritten != 0); + if (rowsWritten < 0) goto _err; + ASSERT(rowsWritten <= pDataCols->numOfPoints); + + tdPopDataColsPoints(pDataCols, rowsWritten); + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints; } + ASSERT(pDataCols->numOfPoints == 0); -_table_over: - // Write the SCompBlock part - pIdx->offset = lseek(hFile.fd, 0, SEEK_END); - if (pIdx->len > 0) { - int bytes = tsendfile(hFile.fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); - if (bytes < pIdx->len) { - printf("Failed to send file, reason: %s\n", strerror(errno)); - } - if (nNewBlocks > 0) { - write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks); - pIdx->len += (sizeof(SCompBlock) * nNewBlocks); - } - } else { - if (nNewBlocks > 0) { - write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks); - pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks; - } - } + // Move the last block to the new .l file if neccessary + if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err; - pIdx->checksum = 0; - pIdx->numOfSuperBlocks += nNewBlocks; - pIdx->hasLast = 0; + // Write the SCompBlock part + if (tsdbWriteCompInfo(pHelper) < 0) goto _err; + } - // Write the SCompIdx part - if (lseek(hFile.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {/* TODO */} - if (write(hFile.fd, (void *)pIndices, sizeof(SCompIdx) * pCfg->maxTables) < 0) {/* TODO */} - - // close the files - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbCloseFile(&pGroup->files[type]); - } - tsdbCloseFile(&hFile); - if (isNewLastFile) tsdbCloseFile(&lFile); - // TODO: replace the .head and .last file - rename(hFile.fname, pGroup->files[TSDB_FILE_TYPE_HEAD].fname); - pGroup->files[TSDB_FILE_TYPE_HEAD].info = hFile.info; - if (isNewLastFile) { - rename(lFile.fname, pGroup->files[TSDB_FILE_TYPE_LAST].fname); - pGroup->files[TSDB_FILE_TYPE_LAST].info = lFile.info; - } + if (tsdbWriteCompIdx(pHelper) < 0) goto _err; - if (pIndices) free(pIndices); - if (pCompInfo) free(pCompInfo); + tsdbCloseHelperFile(pHelper, 0); + // TODO: make it atomic with some methods + pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF; + pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF; + pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF; return 0; + + _err: + ASSERT(false); + tsdbCloseHelperFile(pHelper, 1); + return -1; } -static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) { - if (pIter == NULL) return 0; +/** + * Return the next iterator key. + * + * @return the next key if iter has + * -1 if iter not + */ +static TSKEY tsdbNextIterKey(SSkipListIterator *pIter) { + if (pIter == NULL) return -1; SSkipListNode *node = tSkipListIterGet(pIter); - if (node == NULL) return 0; + if (node == NULL) return -1; SDataRow row = SL_GET_NODE_DATA(node); - if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1; - - return 0; + return dataRowKey(row); } static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) { + TSKEY nextKey; for (int i = 0; i < nIters; i++) { SSkipListIterator *pIter = iters[i]; - if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1; - } - return 0; -} - -static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) { - size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols; - SCompData *pCompData = (SCompData *)malloc(size); - if (pCompData == NULL) return -1; - - pCompData->delimiter = TSDB_FILE_DELIMITER; - pCompData->uid = uid; - pCompData->numOfCols = pCols->numOfCols; - - *offset = lseek(pFile->fd, 0, SEEK_END); - *len = size; - - int toffset = size; - for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { - SCompCol *pCompCol = pCompData->cols + iCol; - SDataCol *pDataCol = pCols->cols + iCol; - - pCompCol->colId = pDataCol->colId; - pCompCol->type = pDataCol->type; - pCompCol->offset = toffset; - - // TODO: add compression - pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite; - toffset += pCompCol->len; - } - - // Write the block - if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; - *len += size; - for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { - SDataCol *pDataCol = pCols->cols + iCol; - SCompCol *pCompCol = pCompData->cols + iCol; - if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; - *len += pCompCol->len; + nextKey = tsdbNextIterKey(pIter); + if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } - - tfree(pCompData); return 0; - -_err: - tfree(pCompData); - return -1; -} - -static int compareKeyBlock(const void *arg1, const void *arg2) { - TSKEY key = *(TSKEY *)arg1; - SCompBlock *pBlock = (SCompBlock *)arg2; - - if (key < pBlock->keyFirst) { - return -1; - } else if (key > pBlock->keyLast) { - return 1; - } - - return 0; -} - -int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { - STsdbCfg * pCfg = &(pRepo->config); - SFile * pFile = NULL; - int numOfPointsToWrite = 0; - int64_t offset = 0; - int32_t len = 0; - - memset((void *)pCompBlock, 0, sizeof(SCompBlock)); - - if (pCompInfo == NULL) { - // Just append the data block to .data or .l or .last file - numOfPointsToWrite = pCols->numOfPoints; - if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file - pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]); - } else { // Write to .last or .l file - pCompBlock->last = 1; - if (lFile) { - pFile = lFile; - } else { - pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]); - } - } - tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid); - pCompBlock->offset = offset; - pCompBlock->len = len; - pCompBlock->algorithm = 2; // TODO : add to configuration - pCompBlock->sversion = pCols->sversion; - pCompBlock->numOfPoints = pCols->numOfPoints; - pCompBlock->numOfSubBlocks = 1; - pCompBlock->numOfCols = pCols->numOfCols; - pCompBlock->keyFirst = dataColsKeyFirst(pCols); - pCompBlock->keyLast = dataColsKeyLast(pCols); - } else { - // Need to merge the block to either the last block or the other block - TSKEY keyFirst = dataColsKeyFirst(pCols); - SCompBlock *pMergeBlock = NULL; - - // Search the block to merge in - void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks, - compareKeyBlock, TD_GE); - if (ptr == NULL) { - // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block - // and merge the data - pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1); - } else { - pMergeBlock = (SCompBlock *)ptr; - } - - if (pMergeBlock->last) { - if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) { - // Need to load the data from .last and combine data in pCols to write to .data file - - } else { // Just append the block to .last or .l file - if (lFile) { - // read the block from .last file and merge with pCols, write to .l file - - } else { - // tsdbWriteBlockToFileImpl(); - } - } - } else { // The block need to merge in .data file - - } - - } - - return numOfPointsToWrite; -} +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c new file mode 100644 index 0000000000000000000000000000000000000000..9c35ebb40fbcfe4a5e98f107ce71bad341562089 --- /dev/null +++ b/src/tsdb/src/tsdbRWHelper.c @@ -0,0 +1,1128 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "tsdbMain.h" +#include "tchecksum.h" +#include "tscompression.h" +#include "talgo.h" + +// Local function definitions +// static int tsdbCheckHelperCfg(SHelperCfg *pCfg); +static int tsdbInitHelperFile(SRWHelper *pHelper); +// static void tsdbClearHelperFile(SHelperFile *pHFile); +static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, + SCompBlock *pCompBlock, bool isLast, bool isSuperBlock); +static int compareKeyBlock(const void *arg1, const void *arg2); +static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey); +static void tsdbResetHelperBlock(SRWHelper *pHelper); + +// ---------- Operations on Helper File part +static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { + memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); + pHelper->files.fid = -1; + pHelper->files.headF.fd = -1; + pHelper->files.dataF.fd = -1; + pHelper->files.lastF.fd = -1; + pHelper->files.nHeadF.fd = -1; + pHelper->files.nLastF.fd = -1; +} + +static int tsdbInitHelperFile(SRWHelper *pHelper) { + // pHelper->compIdxSize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); + size_t tsize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); + pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize); + if (pHelper->pCompIdx == NULL) return -1; + + tsdbResetHelperFileImpl(pHelper); + return 0; +} + +static void tsdbDestroyHelperFile(SRWHelper *pHelper) { + tsdbCloseHelperFile(pHelper, false); + tzfree(pHelper->pCompIdx); +} + +// ---------- Operations on Helper Table part +static void tsdbResetHelperTableImpl(SRWHelper *pHelper) { + memset((void *)&pHelper->tableInfo, 0, sizeof(SHelperTable)); + pHelper->hasOldLastBlock = false; +} + +static void tsdbResetHelperTable(SRWHelper *pHelper) { + tsdbResetHelperBlock(pHelper); + tsdbResetHelperTableImpl(pHelper); + helperClearState(pHelper, (TSDB_HELPER_TABLE_SET|TSDB_HELPER_INFO_LOAD)); +} + +static void tsdbInitHelperTable(SRWHelper *pHelper) { + tsdbResetHelperTableImpl(pHelper); +} + +static void tsdbDestroyHelperTable(SRWHelper *pHelper) { tzfree((void *)pHelper->pCompInfo); } + +// ---------- Operations on Helper Block part +static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) { + tdResetDataCols(pHelper->pDataCols[0]); + tdResetDataCols(pHelper->pDataCols[1]); +} + +static void tsdbResetHelperBlock(SRWHelper *pHelper) { + tsdbResetHelperBlockImpl(pHelper); + // helperClearState(pHelper, TSDB_HELPER_) +} + +static int tsdbInitHelperBlock(SRWHelper *pHelper) { + pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); + pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); + if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) return -1; + + tsdbResetHelperBlockImpl(pHelper); + + return 0; +} + +static void tsdbDestroyHelperBlock(SRWHelper *pHelper) { + tzfree(pHelper->pCompData); + tdFreeDataCols(pHelper->pDataCols[0]); + tdFreeDataCols(pHelper->pDataCols[1]); +} + +static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) { + if (pHelper == NULL || pRepo == NULL) return -1; + + memset((void *)pHelper, 0, sizeof(*pHelper)); + + // Init global configuration + pHelper->config.type = type; + pHelper->config.maxTables = pRepo->config.maxTables; + pHelper->config.maxRowSize = pRepo->tsdbMeta->maxRowBytes; + pHelper->config.maxRows = pRepo->config.maxRowsPerFileBlock; + pHelper->config.maxCols = pRepo->tsdbMeta->maxCols; + pHelper->config.minRowsPerFileBlock = pRepo->config.minRowsPerFileBlock; + pHelper->config.maxRowsPerFileBlock = pRepo->config.maxRowsPerFileBlock; + pHelper->config.compress = pRepo->config.compression; + + pHelper->state = TSDB_HELPER_CLEAR_STATE; + + // Init file part + if (tsdbInitHelperFile(pHelper) < 0) goto _err; + + // Init table part + tsdbInitHelperTable(pHelper); + + // Init block part + if (tsdbInitHelperBlock(pHelper) < 0) goto _err; + + return 0; + +_err: + tsdbDestroyHelper(pHelper); + return -1; +} + +// ------------------------------------------ OPERATIONS FOR OUTSIDE ------------------------------------------ +int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { + return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER); +} + +int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { + return tsdbInitHelper(pHelper, pRepo, TSDB_WRITE_HELPER); +} + +void tsdbDestroyHelper(SRWHelper *pHelper) { + if (pHelper) { + tsdbDestroyHelperFile(pHelper); + tsdbDestroyHelperTable(pHelper); + tsdbDestroyHelperBlock(pHelper); + memset((void *)pHelper, 0, sizeof(*pHelper)); + } +} + +void tsdbResetHelper(SRWHelper *pHelper) { + if (pHelper) { + // Reset the block part + tsdbResetHelperBlockImpl(pHelper); + + // Reset the table part + tsdbResetHelperTableImpl(pHelper); + + // Reset the file part + tsdbCloseHelperFile(pHelper, false); + tsdbResetHelperFileImpl(pHelper); + + pHelper->state = TSDB_HELPER_CLEAR_STATE; + } +} + +// ------------ Operations for read/write purpose +int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { + ASSERT(pHelper != NULL && pGroup != NULL); + + // Clear the helper object + tsdbResetHelper(pHelper); + + ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE); + + // Set the files + pHelper->files.fid = pGroup->fileId; + pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; + pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; + pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; + if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { + char *fnameDup = strdup(pHelper->files.headF.fname); + if (fnameDup == NULL) return -1; + char *dataDir = dirname(fnameDup); + + tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname); + tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname); + free((void *)fnameDup); + } + + // Open the files + if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; + if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { + if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err; + if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; + + // Create and open .h + if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1; + size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); + if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, tsize) < tsize) goto _err; + + // Create and open .l file if should + if (tsdbShouldCreateNewLast(pHelper)) { + if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; + if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err; + } + } else { + if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; + if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err; + } + + helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN); + + return tsdbLoadCompIdx(pHelper, NULL); + + _err: + return -1; +} + +int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { + if (pHelper->files.headF.fd > 0) { + close(pHelper->files.headF.fd); + pHelper->files.headF.fd = -1; + } + if (pHelper->files.dataF.fd > 0) { + close(pHelper->files.dataF.fd); + pHelper->files.dataF.fd = -1; + } + if (pHelper->files.lastF.fd > 0) { + close(pHelper->files.lastF.fd); + pHelper->files.lastF.fd = -1; + } + if (pHelper->files.nHeadF.fd > 0) { + close(pHelper->files.nHeadF.fd); + pHelper->files.nHeadF.fd = -1; + if (hasError) { + remove(pHelper->files.nHeadF.fname); + } else { + rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname); + pHelper->files.headF.info = pHelper->files.nHeadF.info; + } + } + + if (pHelper->files.nLastF.fd > 0) { + close(pHelper->files.nLastF.fd); + pHelper->files.nLastF.fd = -1; + if (hasError) { + remove(pHelper->files.nLastF.fname); + } else { + rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname); + pHelper->files.lastF.info = pHelper->files.nLastF.info; + } + } + return 0; +} + +void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { + ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)); + + // Clear members and state used by previous table + tsdbResetHelperTable(pHelper); + ASSERT(helperHasState(pHelper, (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD))); + + pHelper->tableInfo.tid = pTable->tableId.tid; + pHelper->tableInfo.uid = pTable->tableId.uid; + pHelper->tableInfo.sversion = pTable->sversion; + STSchema *pSchema = tsdbGetTableSchema(pRepo->tsdbMeta, pTable); + + tdInitDataCols(pHelper->pDataCols[0], pSchema); + tdInitDataCols(pHelper->pDataCols[1], pSchema); + + SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid; + if (pIdx->offset > 0 && pIdx->hasLast) { + pHelper->hasOldLastBlock = true; + } + + helperSetState(pHelper, TSDB_HELPER_TABLE_SET); + ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); +} + +/** + * Write part of of points from pDataCols to file + * + * @return: number of points written to file successfully + * -1 for failure + */ +int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { + ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); + ASSERT(pDataCols->numOfPoints > 0); + + SCompBlock compBlock; + int rowsToWrite = 0; + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + + ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)); + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose + + // Load the SCompInfo part if neccessary + ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); + if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err; + + if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block + ASSERT(pHelper->hasOldLastBlock == false); + rowsToWrite = pDataCols->numOfPoints; + SFile *pWFile = NULL; + bool isLast = false; + + if (rowsToWrite >= pHelper->config.minRowsPerFileBlock) { + pWFile = &(pHelper->files.dataF); + } else { + isLast = true; + pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); + } + + if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err; + + if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfSuperBlocks) < 0) goto _err; + } else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block + SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), + pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE); + + int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfSuperBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks); + + if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block + ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last); + rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); + if (rowsToWrite < 0) goto _err; + } else { // Has key overlap + + if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { + // Key overlap with the block, must merge with the block + + rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); + if (rowsToWrite < 0) goto _err; + } else { // Save as a super block in the middle + rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1); + ASSERT(rowsToWrite > 0); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err; + if (tsdbInsertSuperBlock(pHelper, pCompBlock, blkIdx) < 0) goto _err; + } + } + } + + return rowsToWrite; + +_err: + return -1; +} + +int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { + ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompBlock compBlock; + if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { + if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; + + SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfSuperBlocks - 1; + ASSERT(pCompBlock->last); + + if (pCompBlock->numOfSubBlocks > 1) { + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfSuperBlocks - 1), NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 && + pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], + pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0) + return -1; + + if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfSuperBlocks - 1) < 0) return -1; + + } else { + if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1; + pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END); + if (pCompBlock->offset < 0) return -1; + + if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) + return -1; + } + + pHelper->hasOldLastBlock = false; + } + + return 0; +} + +int tsdbWriteCompInfo(SRWHelper *pHelper) { + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { + if (pIdx->offset > 0) { + pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + if (pIdx->offset < 0) return -1; + ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); + + if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; + } + } else { + pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; + pHelper->pCompInfo->uid = pHelper->tableInfo.uid; + ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); + pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); + if (pIdx->offset < 0) return -1; + ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); + + if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; + } + + return 0; +} + +int tsdbWriteCompIdx(SRWHelper *pHelper) { + ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); + if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + + ASSERT(tsizeof(pHelper->pCompIdx) == sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM)); + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); + + if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) + return -1; + return 0; +} + +int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { + ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN); + + if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { + // If not load from file, just load it in object + int fd = pHelper->files.headF.fd; + + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + if (tread(fd, (void *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) return -1; + if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx))) { + // TODO: File is broken, try to deal with it + return -1; + } + } + helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); + + // Copy the memory for outside usage + if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); + + return 0; +} + +int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { + ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); + + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + + int fd = pHelper->files.headF.fd; + + if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { + if (pIdx->offset > 0) { + if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1; + + pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len); + if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; + if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1; + } + + helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); + } + + if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len); + + return 0; +} + +int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { + ASSERT(pCompBlock->numOfSubBlocks <= 1); + int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + + if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1; + + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); + pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize); + if (pHelper->pCompData == NULL) return -1; + if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1; + + ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols); + + if (target) memcpy(target, pHelper->pCompData, tsize); + + return 0; +} + +static int comparColIdCompCol(const void *arg1, const void *arg2) { + return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId; +} + +static int comparColIdDataCol(const void *arg1, const void *arg2) { + return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId; +} + +static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) { + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; + if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1; + if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1; + + return 0; +} + +static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds, + SDataCols *pDataCols) { + if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1; + int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + + void *ptr = NULL; + for (int i = 0; i < numOfColIds; i++) { + int16_t colId = colIds[i]; + + ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol), comparColIdCompCol); + if (ptr == NULL) continue; + SCompCol *pCompCol = (SCompCol *)ptr; + + ptr = bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol); + ASSERT(ptr != NULL); + SDataCol *pDataCol = (SDataCol *)ptr; + + pDataCol->len = pCompCol->len; + if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1; + } + + return 0; +} + +// Load specific column data from file +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) { + SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block + + int numOfSubBlocks = pCompBlock->numOfSubBlocks; + SCompBlock *pStartBlock = + (numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset); + + if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1; + for (int i = 1; i < numOfSubBlocks; i++) { + pStartBlock++; + if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1; + tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints); + } + + return 0; +} + +/** + * Interface to read the data of a sub-block OR the data of a super-block of which (numOfSubBlocks == 1) + */ +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { + ASSERT(pCompBlock->numOfSubBlocks <= 1); + + SCompData *pCompData = (SCompData *)malloc(pCompBlock->len); + if (pCompData == NULL) return -1; + + int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; + if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; + ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); + + // TODO : check the checksum + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); + if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; + for (int i = 0; i < pCompData->numOfCols; i++) { + // TODO: check the data checksum + // if (!taosCheckChecksumWhole()) + } + + ASSERT(pCompBlock->numOfCols == pCompData->numOfCols); + + pDataCols->numOfPoints = pCompBlock->numOfPoints; + + int ccol = 0, dcol = 0; + while (true) { + if (ccol >= pDataCols->numOfCols) { + // TODO: Fill rest NULL + break; + } + if (dcol >= pCompData->numOfCols) break; + + SCompCol *pCompCol = &(pCompData->cols[ccol]); + SDataCol *pDataCol = &(pDataCols->cols[dcol]); + + if (pCompCol->colId == pDataCol->colId) { + // TODO: uncompress + memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tsize + pCompCol->offset), pCompCol->len); + ccol++; + dcol++; + } else if (pCompCol->colId > pDataCol->colId) { + // TODO: Fill NULL + dcol++; + } else { + ccol++; + } + } + + tfree(pCompData); + return 0; + +_err: + tfree(pCompData); + return -1; +} + +// Load the whole block data +int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) { + // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + int numOfSubBlock = pCompBlock->numOfSubBlocks; + if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset); + + tdResetDataCols(pHelper->pDataCols[0]); + if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err; + for (int i = 1; i < numOfSubBlock; i++) { + tdResetDataCols(pHelper->pDataCols[1]); + pCompBlock++; + if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err; + if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err; + } + + // if (target) TODO + + return 0; + +_err: + return -1; +} + +// static int tsdbCheckHelperCfg(SHelperCfg *pCfg) { +// // TODO +// return 0; +// } + +// static void tsdbClearHelperFile(SHelperFile *pHFile) { +// pHFile->fid = -1; +// if (pHFile->headF.fd > 0) { +// close(pHFile->headF.fd); +// pHFile->headF.fd = -1; +// } +// if (pHFile->dataF.fd > 0) { +// close(pHFile->dataF.fd); +// pHFile->dataF.fd = -1; +// } +// if (pHFile->lastF.fd > 0) { +// close(pHFile->lastF.fd); +// pHFile->lastF.fd = -1; +// } +// if (pHFile->nHeadF.fd > 0) { +// close(pHFile->nHeadF.fd); +// pHFile->nHeadF.fd = -1; +// } +// if (pHFile->nLastF.fd > 0) { +// close(pHFile->nLastF.fd); +// pHFile->nLastF.fd = -1; +// } + +// } + +static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { + ASSERT(pHelper->files.lastF.fd > 0); + struct stat st; + fstat(pHelper->files.lastF.fd, &st); + if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true; + return false; +} + +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, + bool isLast, bool isSuperBlock) { + ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && + rowsToWrite <= pHelper->config.maxRowsPerFileBlock); + + SCompData *pCompData = NULL; + int64_t offset = 0; + + offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) goto _err; + + pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols + sizeof(TSCKSUM)); + if (pCompData == NULL) goto _err; + + int nColsNotAllNull = 0; + int32_t toffset = 0; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + SDataCol *pDataCol = pDataCols->cols + ncol; + SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; + + if (0) { + // TODO: all data to commit are NULL + continue; + } + + // Compress the data here + { + // TODO + } + + pCompCol->colId = pDataCol->colId; + pCompCol->type = pDataCol->type; + pCompCol->len = TYPE_BYTES[pCompCol->type] * rowsToWrite; // TODO: change it + pCompCol->offset = toffset; + nColsNotAllNull++; + + toffset += pCompCol->len; + } + + ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols); + + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = pHelper->tableInfo.uid; + pCompData->numOfCols = nColsNotAllNull; + + // Write SCompData + SCompCol part + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); + if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err; + // Write true data part + int nCompCol = 0; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + ASSERT(nCompCol < nColsNotAllNull); + + SDataCol *pDataCol = pDataCols->cols + ncol; + SCompCol *pCompCol = pCompData->cols + nCompCol; + + if (pDataCol->colId == pCompCol->colId) { + if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err; + tsize += pCompCol->len; + nCompCol++; + } + } + + pCompBlock->last = isLast; + pCompBlock->offset = offset; + pCompBlock->algorithm = pHelper->config.compress; + pCompBlock->numOfPoints = rowsToWrite; + pCompBlock->sversion = pHelper->tableInfo.sversion; + pCompBlock->len = (int32_t)tsize; + pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; + pCompBlock->numOfCols = nColsNotAllNull; + pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); + pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); + + tfree(pCompData); + return 0; + + _err: + tfree(pCompData); + return -1; +} + +static int compareKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SCompBlock *pBlock = (SCompBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } + + return 0; +} + +static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) { + return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2)); +} + +// Merge the data with a block in file +static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { + // TODO: set pHelper->hasOldBlock + int rowsWritten = 0; + SCompBlock compBlock = {0}; + + ASSERT(pDataCols->numOfPoints > 0); + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + ASSERT(blkIdx < pIdx->numOfSuperBlocks); + + // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1); + ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst); + // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); + + if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append + ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfSuperBlocks-1); + int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface + + rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints); + if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && + (blockAtIdx(pHelper, blkIdx)->numOfPoints + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) { + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0) + goto _err; + if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; + } else { + // Load + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; + ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints); + // Merge + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; + // Write + SFile *pWFile = NULL; + bool isLast = false; + if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.minRowsPerFileBlock) { + pWFile = &(pHelper->files.dataF); + } else { + isLast = true; + pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); + } + if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], + pHelper->pDataCols[0]->numOfPoints, &compBlock, isLast, true) < 0) + goto _err; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; + } + + ASSERT(pHelper->hasOldLastBlock); + pHelper->hasOldLastBlock = false; + } else { + // Key must overlap with the block + ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast); + + TSKEY keyLimit = + (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1; + + // rows1: number of rows must merge in this block + int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); + // rows2: max nuber of rows the block can have more + int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfPoints; + // rows3: number of rows between this block and the next block + int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit); + + ASSERT(rows3 >= rows1); + + if ((rows2 >= rows1) && + (( blockAtIdx(pHelper, blkIdx)->last) || + ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) { + rowsWritten = rows1; + bool isLast = false; + SFile *pFile = NULL; + + if (blockAtIdx(pHelper, blkIdx)->last) { + isLast = true; + pFile = &(pHelper->files.lastF); + } else { + pFile = &(pHelper->files.dataF); + } + + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err; + if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; + } else { // Load-Merge-Write + // Load + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; + if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; + + rowsWritten = rows3; + + int iter1 = 0; // iter over pHelper->pDataCols[0] + int iter2 = 0; // iter over pDataCols + int round = 0; + // tdResetDataCols(pHelper->pDataCols[1]); + while (true) { + if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) break; + tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5); + ASSERT(pHelper->pDataCols[1]->numOfPoints > 0); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1], + pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) + goto _err; + if (round == 0) { + tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx); + } else { + tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); + } + round++; + blkIdx++; + // TODO: the blkIdx here is not correct + + // if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) { + // if (pHelper->pDataCols[1]->numOfPoints > 0) { + // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], + // pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) + // goto _err; + // // TODO: the blkIdx here is not correct + // tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints); + // } + // } + + // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints + // ? INT64_MAX + // : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; + // TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; + + // if (key1 < key2) { + // for (int i = 0; i < pDataCols->numOfCols; i++) { + // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; + // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + // ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), + // TYPE_BYTES[pDataCol->type]); + // } + // pHelper->pDataCols[1]->numOfPoints++; + // iter1++; + // } else if (key1 == key2) { + // // TODO: think about duplicate key cases + // ASSERT(false); + // } else { + // for (int i = 0; i < pDataCols->numOfCols; i++) { + // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; + // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + // ((char *)pDataCols->cols[i].pData + + // TYPE_BYTES[pDataCol->type] * iter2), + // TYPE_BYTES[pDataCol->type]); + // } + // pHelper->pDataCols[1]->numOfPoints++; + // iter2++; + // } + + // if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { + // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err; + // // TODO: blkIdx here is not correct, fix it + // tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); + + // tdResetDataCols(pHelper->pDataCols[1]); + // } + } + } + } + + return rowsWritten; + + _err: + return -1; +} + +static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); } + +static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { + + if (tsizeof((void *)pHelper->pCompInfo) <= esize) { + size_t tsize = esize + sizeof(SCompBlock) * 16; + pHelper->pCompInfo = (SCompInfo *)trealloc(pHelper->pCompInfo, tsize); + if (pHelper->pCompInfo == NULL) return -1; + } + + return 0; +} + +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + + ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfSuperBlocks); + ASSERT(pCompBlock->numOfSubBlocks == 1); + + // Adjust memory if no more room + if (pIdx->len == 0) pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM); + if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err; + + // Change the offset + for (int i = 0; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); + } + + // Memmove if needed + int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx); + if (tsize > 0) { + ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < tsizeof(pHelper->pCompInfo)); + ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= tsizeof(pHelper->pCompInfo)); + memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)), + (void *)((char *)pHelper->pCompInfo + sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize); + } + pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; + + pIdx->numOfSuperBlocks++; + pIdx->len += sizeof(SCompBlock); + ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo)); + pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; + pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; + + return 0; + +_err: + return -1; +} + +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) { + ASSERT(pCompBlock->numOfSubBlocks == 0); + + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); + + SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); + + size_t spaceNeeded = + (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock); + if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; + + pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + // Add the sub-block + if (pSCompBlock->numOfSubBlocks > 1) { + size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len); + if (tsize > 0) { + memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)), + (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); + + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); + } + } + + + *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock; + + pSCompBlock->numOfSubBlocks++; + ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); + pSCompBlock->len += sizeof(SCompBlock); + pSCompBlock->numOfPoints += rowsAdded; + pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst); + pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast); + pIdx->len += sizeof(SCompBlock); + } else { // Need to create two sub-blocks + void *ptr = NULL; + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + if (pTCompBlock->numOfSubBlocks > 1) { + ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len); + break; + } + } + + if (ptr == NULL) ptr = (void *)((char *)(pHelper->pCompInfo) + pIdx->len - sizeof(TSCKSUM)); + + size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); + if (tsize > 0) { + memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize); + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2); + } + } + + ((SCompBlock *)ptr)[0] = *pSCompBlock; + ((SCompBlock *)ptr)[0].numOfSubBlocks = 0; + + ((SCompBlock *)ptr)[1] = *pCompBlock; + + pSCompBlock->numOfSubBlocks = 2; + pSCompBlock->numOfPoints += rowsAdded; + pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); + pSCompBlock->len = sizeof(SCompBlock) * 2; + pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst); + pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast); + + pIdx->len += (sizeof(SCompBlock) * 2); + } + + pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; + pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; + + return 0; + +_err: + return -1; +} + +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { + ASSERT(pCompBlock->numOfSubBlocks == 1); + + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + + ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); + + SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + ASSERT(pSCompBlock->numOfSubBlocks >= 1); + + // Delete the sub blocks it has + if (pSCompBlock->numOfSubBlocks > 1) { + size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len); + if (tsize > 0) { + memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset), + (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); + } + + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); + } + + pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); + } + + *pSCompBlock = *pCompBlock; + + pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; + pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; + + return 0; +} + +// Get the number of rows in range [minKey, maxKey] +static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) { + if (pDataCols->numOfPoints == 0) return 0; + + ASSERT(minKey <= maxKey); + TSKEY keyFirst = dataColsKeyFirst(pDataCols); + TSKEY keyLast = dataColsKeyLast(pDataCols); + ASSERT(keyFirst <= keyLast); + + if (minKey > keyLast || maxKey < keyFirst) return 0; + + void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), + compTSKEY, TD_GE); + ASSERT(ptr1 != NULL); + + void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), + compTSKEY, TD_LE); + ASSERT(ptr2 != NULL); + + if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0; + + return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 600ed2ba8d20a5d76f46745c2a35e07ce7f2f533..de1fc74ead1cabefaa4a0e90f523eb672a5bdb5d 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -120,6 +120,7 @@ typedef struct STsdbQueryHandle { SFileGroup* pFileGroup; SFileGroupIter fileIter; SCompIdx* compIndex; + SRWHelper rhelper; } STsdbQueryHandle; static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { @@ -142,7 +143,8 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->order = pCond->order; pQueryHandle->window = pCond->twindow; pQueryHandle->pTsdb = tsdb; - pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)), + pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)); + tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); pQueryHandle->cur.fid = -1; @@ -289,14 +291,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo SFileGroup* fileGroup = pQueryHandle->pFileGroup; assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); - if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) { - fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY); - } else { - assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd)); - } + tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup); // load all the comp offset value for all tables in this file - tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables + // tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables *numOfBlocks = 0; size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); @@ -304,7 +302,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo for (int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid]; + SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file continue;//no data blocks in the file belongs to pCheckInfo->pTable } else { @@ -318,8 +316,13 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo pCheckInfo->compSize = compIndex->len; } - tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); - + // tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->tableId.uid); + assert(pTable != NULL); + + tsdbSetHelperTable(&pQueryHandle->rhelper, pTable, pQueryHandle->pTsdb); + + tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); SCompInfo* pCompInfo = pCheckInfo->pCompInfo; TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); @@ -410,12 +413,12 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj)); - SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA]; - if (pFile->fd == FD_INITIALIZER) { - pFile->fd = open(pFile->fname, O_RDONLY); - } + // SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA]; + // if (pFile->fd == FD_INITIALIZER) { + // pFile->fd = open(pFile->fname, O_RDONLY); + // } - if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { + if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) { SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; @@ -428,8 +431,8 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo taosArrayDestroy(sa); tfree(data); - TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData; - assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast); + // TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData; + // assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast); return blockLoaded; } @@ -585,7 +588,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf } } - int32_t start = MIN(cur->pos, endPos); + // int32_t start = MIN(cur->pos, endPos); // move the data block in the front to data block if needed int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); @@ -597,9 +600,10 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j); if (pCol->info.colId == colId) { - SDataCol* pDataCol = &pCols->cols[i]; - memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start, - pQueryHandle->realNumOfRows * pCol->info.bytes); + // SDataCol* pDataCol = &pCols->cols[i]; + pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData; + // memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start, + // pQueryHandle->realNumOfRows * pCol->info.bytes); break; } } @@ -1517,14 +1521,15 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { taosArrayDestroy(pQueryHandle->pTableCheckInfo); tfree(pQueryHandle->compIndex); - size_t cols = taosArrayGetSize(pQueryHandle->pColumns); - for (int32_t i = 0; i < cols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - tfree(pColInfo->pData); - } + // size_t cols = taosArrayGetSize(pQueryHandle->pColumns); + // for (int32_t i = 0; i < cols; ++i) { + // SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + // // tfree(pColInfo->pData); + // } taosArrayDestroy(pQueryHandle->pColumns); tfree(pQueryHandle->pDataBlockInfo); + tsdbDestroyHelper(&pQueryHandle->rhelper); tfree(pQueryHandle); } diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index eac29af097da67706cbcfff222a6e18c75d2e41c..85fca7d94f3422a8eb57321f9e7379c0f397f86c 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -5,12 +5,84 @@ #include "dataformat.h" #include "tsdbMain.h" -double getCurTime() { +static double getCurTime() { struct timeval tv; gettimeofday(&tv, NULL); return tv.tv_sec + tv.tv_usec * 1E-6; } +typedef struct { + tsdb_repo_t *pRepo; + int tid; + int64_t uid; + int sversion; + TSKEY startTime; + TSKEY interval; + int totalRows; + int rowsPerSubmit; + STSchema * pSchema; +} SInsertInfo; + +static int insertData(SInsertInfo *pInfo) { + SSubmitMsg *pMsg = + (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit); + if (pMsg == NULL) return -1; + TSKEY start_time = pInfo->startTime; + + // Loop to write data + double stime = getCurTime(); + + for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) { + memset((void *)pMsg, 0, sizeof(SSubmitMsg)); + SSubmitBlk *pBlock = pMsg->blocks; + pBlock->uid = pInfo->uid; + pBlock->tid = pInfo->tid; + pBlock->sversion = pInfo->sversion; + pBlock->len = 0; + for (int i = 0; i < pInfo->rowsPerSubmit; i++) { + // start_time += 1000; + start_time += pInfo->interval; + SDataRow row = (SDataRow)(pBlock->data + pBlock->len); + tdInitDataRow(row, pInfo->pSchema); + + for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) { + if (j == 0) { // Just for timestamp + tdAppendColVal(row, (void *)(&start_time), schemaColAt(pInfo->pSchema, j)); + } else { // For int + int val = 10; + tdAppendColVal(row, (void *)(&val), schemaColAt(pInfo->pSchema, j)); + } + } + pBlock->len += dataRowLen(row); + } + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + pMsg->numOfBlocks = 1; + + pBlock->len = htonl(pBlock->len); + pBlock->numOfRows = htonl(pBlock->numOfRows); + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + + pBlock->sversion = htonl(pBlock->sversion); + pBlock->padding = htonl(pBlock->padding); + + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + pMsg->compressed = htonl(pMsg->numOfBlocks); + + if (tsdbInsertData(pInfo->pRepo, pMsg) < 0) { + tfree(pMsg); + return -1; + } + } + + double etime = getCurTime(); + + printf("Spent %f seconds to write %d records\n", etime - stime, pInfo->totalRows); + tfree(pMsg); + return 0; +} + TEST(TsdbTest, DISABLED_tableEncodeDecode) { // TEST(TsdbTest, tableEncodeDecode) { STable *pTable = (STable *)malloc(sizeof(STable)); @@ -48,135 +120,132 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); } -TEST(TsdbTest, DISABLED_createRepo) { -// TEST(TsdbTest, createRepo) { - // STsdbCfg config; - - // // 1. Create a tsdb repository - // tsdbSetDefaultCfg(&config); - // tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL); - // ASSERT_NE(pRepo, nullptr); - - // // 2. Create a normal table - // STableCfg tCfg; - // ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); - // ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0); - - // int nCols = 5; - // STSchema *schema = tdNewSchema(nCols); - - // for (int i = 0; i < nCols; i++) { - // if (i == 0) { - // tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); - // } else { - // tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); - // } - // } - - // tsdbTableSetSchema(&tCfg, schema, true); - - // tsdbCreateTable(pRepo, &tCfg); - - // // // 3. Loop to write some simple data - // int nRows = 1; - // int rowsPerSubmit = 1; - // int64_t start_time = 1584081000000; - - // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); - - // double stime = getCurTime(); - - // for (int k = 0; k < nRows/rowsPerSubmit; k++) { - // memset((void *)pMsg, 0, sizeof(SSubmitMsg)); - // SSubmitBlk *pBlock = pMsg->blocks; - // pBlock->uid = 987607499877672L; - // pBlock->tid = 0; - // pBlock->sversion = 0; - // pBlock->len = 0; - // for (int i = 0; i < rowsPerSubmit; i++) { - // // start_time += 1000; - // start_time += 1000; - // SDataRow row = (SDataRow)(pBlock->data + pBlock->len); - // tdInitDataRow(row, schema); - - // for (int j = 0; j < schemaNCols(schema); j++) { - // if (j == 0) { // Just for timestamp - // tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); - // } else { // For int - // int val = 10; - // tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); - // } - // } - // pBlock->len += dataRowLen(row); - // } - // pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; - // pMsg->numOfBlocks = 1; - - // pBlock->len = htonl(pBlock->len); - // pBlock->numOfRows = htonl(pBlock->numOfRows); - // pBlock->uid = htobe64(pBlock->uid); - // pBlock->tid = htonl(pBlock->tid); - - // pBlock->sversion = htonl(pBlock->sversion); - // pBlock->padding = htonl(pBlock->padding); - - // pMsg->length = htonl(pMsg->length); - // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - // pMsg->compressed = htonl(pMsg->numOfBlocks); - - // tsdbInsertData(pRepo, pMsg); - // } - - // double etime = getCurTime(); - - // void *ptr = malloc(150000); - // free(ptr); - - // printf("Spent %f seconds to write %d records\n", etime - stime, nRows); - - // tsdbCloseRepo(pRepo); +// TEST(TsdbTest, DISABLED_createRepo) { +TEST(TsdbTest, createRepo) { + STsdbCfg config; + STsdbRepo *repo; -} + // 1. Create a tsdb repository + tsdbSetDefaultCfg(&config); + ASSERT_EQ(tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL), 0); -// TEST(TsdbTest, DISABLED_openRepo) { -TEST(TsdbTest, openRepo) { - tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL); - ASSERT_NE(repo, nullptr); + tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + ASSERT_NE(pRepo, nullptr); - STsdbRepo *pRepo = (STsdbRepo *)repo; + // 2. Create a normal table + STableCfg tCfg; + ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); + ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0); - SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655); + int nCols = 5; + STSchema *schema = tdNewSchema(nCols); - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbOpenFile(&pGroup->files[type], O_RDONLY); + for (int i = 0; i < nCols; i++) { + if (i == 0) { + tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); + } else { + tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); + } } - SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx)); - tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables); + tsdbTableSetSchema(&tCfg, schema, true); - SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len); + tsdbCreateTable(pRepo, &tCfg); - tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo); + // Insert Some Data + SInsertInfo iInfo = { + .pRepo = pRepo, + .tid = tCfg.tableId.tid, + .uid = tCfg.tableId.uid, + .sversion = tCfg.sversion, + .startTime = 1584081000000, + .interval = 1000, + .totalRows = 50, + .rowsPerSubmit = 1, + .pSchema = schema + }; - int blockIdx = 0; - SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]); + ASSERT_EQ(insertData(&iInfo), 0); - SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols); + // Close the repository + tsdbCloseRepo(pRepo); - tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); + // Open the repository again + pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + repo = (STsdbRepo *)pRepo; + ASSERT_NE(pRepo, nullptr); - STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); - SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10); - tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable)); + // Insert more data + iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows; + iInfo.totalRows = 10; + iInfo.pRepo = pRepo; + ASSERT_EQ(insertData(&iInfo), 0); - tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); + // Close the repository + tsdbCloseRepo(pRepo); - tdResetDataCols(pDataCols); + // Open the repository again + pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + repo = (STsdbRepo *)pRepo; + ASSERT_NE(pRepo, nullptr); - tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData); + // Read from file + SRWHelper rhelper; + tsdbInitReadHelper(&rhelper, repo); + SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833); + ASSERT_NE(pFGroup, nullptr); + ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0); + + STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid); + ASSERT_NE(pTable, nullptr); + tsdbSetHelperTable(&rhelper, pTable, repo); + + ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0); + ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0); int k = 0; +} + +TEST(TsdbTest, DISABLED_openRepo) { +// TEST(TsdbTest, openRepo) { + // tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL); + // ASSERT_NE(repo, nullptr); + + // STsdbRepo *pRepo = (STsdbRepo *)repo; + + // SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655); + +// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { +// tsdbOpenFile(&pGroup->files[type], O_RDONLY); +// } + +// SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx)); +// tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables); + +// SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len); + + // tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo); + +// int blockIdx = 0; +// SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]); + +// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols); + +// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData); + + // STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid); + // SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10); + // tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable)); + +// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData); + + // tdResetDataCols(pDataCols); + + // tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData); + + +// int k = 0; } diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index 18a277c7fa19e7819279ee8fd0f0542bedb826e9..ed58c2e60d86b9bb11d71c132295718657b4add4 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -176,6 +176,13 @@ uint32_t ip2uint(const char *const ip_addr); void taosSetAllocMode(int mode, const char* path, bool autoDump); void taosDumpMemoryLeak(); +void * tmalloc(size_t size); +void * tcalloc(size_t nmemb, size_t size); +size_t tsizeof(void *ptr); +void tmemset(void *ptr, int c); +void * trealloc(void *ptr, size_t size); +void tzfree(void *ptr); + #ifdef TAOS_MEM_CHECK void * taos_malloc(size_t size, const char *file, uint32_t line); diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 7496ad482bdb49348bc0214f24767f5a12b4aafb..1c55ab945756528f8cf4ff743d654614d6d5edbe 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -618,3 +618,48 @@ char *taosCharsetReplace(char *charsetstr) { return strdup(charsetstr); } + +void *tmalloc(size_t size) { + if (size <= 0) return NULL; + + void *ret = malloc(size + sizeof(size_t)); + if (ret == NULL) return NULL; + + *(size_t *)ret = size; + + return (void *)((char *)ret + sizeof(size_t)); +} + +void *tcalloc(size_t nmemb, size_t size) { + size_t tsize = nmemb * size; + void * ret = tmalloc(tsize); + if (ret == NULL) return NULL; + + tmemset(ret, 0); + return ret; +} + +size_t tsizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; } + +void tmemset(void *ptr, int c) { memset(ptr, c, tsizeof(ptr)); } + +void * trealloc(void *ptr, size_t size) { + if (ptr == NULL) return tmalloc(size); + + if (size <= tsizeof(ptr)) return ptr; + + void * tptr = (void *)((char *)ptr - sizeof(size_t)); + size_t tsize = size + sizeof(size_t); + tptr = realloc(tptr, tsize); + if (tptr == NULL) return NULL; + + *(size_t *)tptr = size; + + return (void *)((char *)tptr + sizeof(size_t)); +} + +void tzfree(void *ptr) { + if (ptr) { + free((void *)((char *)ptr - sizeof(size_t))); + } +} \ No newline at end of file diff --git a/src/util/tests/taosbsearchTest.cpp b/src/util/tests/taosbsearchTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0b250c9ecc277506fc2ba4fd7cac49b2bf57fe7f --- /dev/null +++ b/src/util/tests/taosbsearchTest.cpp @@ -0,0 +1,414 @@ +#include + +#include "talgo.h" + +static int compareFunc(const void *arg1, const void *arg2) { return (*(int *)arg1) - (*(int *)arg2); } + +TEST(testCase, taosbsearch_equal) { + // For equal test + int key = 3; + void *pRet = NULL; + + pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + // 1 element + int array1[1] = {5}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + key = 5; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_EQ); + ASSERT_NE(pRet, nullptr); + ASSERT_EQ(*(int *)pRet, key); + + // 2 element + int array2[2] = {3, 6}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(*(int *)pRet, key); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + // 3 element + int array3[3] = {3, 6, 8}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(*(int *)pRet, 8); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ); + ASSERT_EQ(pRet, nullptr); +} + +TEST(testCase, taosbsearch_greater_or_equal) { + // For equal test + int key = 3; + void *pRet = NULL; + + pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(pRet, nullptr); + + // 1 element + int array1[1] = {5}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 5); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(pRet, nullptr); + + key = 5; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GE); + ASSERT_NE(pRet, nullptr); + ASSERT_EQ(*(int *)pRet, 5); + + // 2 element + int array2[2] = {3, 6}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(pRet, nullptr); + + // 3 element + int array3[3] = {3, 6, 8}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(pRet, nullptr); + + // 4 element + int array4[4] = {3, 6, 8, 11}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 11); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 11); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(pRet, nullptr); + + // 5 element + int array5[5] = {3, 6, 8, 11, 15}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 11); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 11); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 15); + + key = 15; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(*(int *)pRet, 15); + + key = 17; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE); + ASSERT_EQ(pRet, nullptr); +} + +TEST(testCase, taosbsearch_less_or_equal) { + // For equal test + int key = 3; + void *pRet = NULL; + + pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(pRet, nullptr); + + // 1 element + int array1[1] = {5}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(pRet, nullptr); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 5); + + key = 5; + pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LE); + ASSERT_NE(pRet, nullptr); + ASSERT_EQ(*(int *)pRet, 5); + + // 2 element + int array2[2] = {3, 6}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + // 3 element + int array3[3] = {3, 6, 8}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 8); + + // 4 element + int array4[4] = {3, 6, 8, 11}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 11); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 11); + + // 5 element + int array5[5] = {3, 6, 8, 11, 15}; + + key = 1; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(pRet, nullptr); + + key = 3; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 4; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 3); + + key = 6; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 7; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 6); + + key = 8; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 9; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 8); + + key = 11; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 11); + + key = 13; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 11); + + key = 15; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 15); + + key = 17; + pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE); + ASSERT_EQ(*(int *)pRet, 15); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0827d90ebcee8732cd2fe867e7baa1084c6cbf5f..393758761d2fa63c22eaa0d93350b1f2ff58b661 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -88,6 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { STsdbCfg tsdbCfg = {0}; tsdbCfg.precision = pVnodeCfg->cfg.precision; + tsdbCfg.compression = -1; tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;