diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index f6bc44def339d6c0605985121da52cdd9be4575b..c7743e733d63c9a2835d430e93ae01014165e5aa 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -351,13 +351,14 @@ typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef struct { tsdb_rw_helper_t type; // helper type - int maxTables; - int maxRowSize; - int maxRows; - int maxCols; - int minRowsPerFileBlock; - int maxRowsPerFileBlock; - int8_t compress; + + int maxTables; + int maxRowSize; + int maxRows; + int maxCols; + int minRowsPerFileBlock; + int maxRowsPerFileBlock; + int8_t compress; } SHelperCfg; typedef struct { @@ -388,17 +389,14 @@ typedef struct { // For file set usage SHelperFile files; SCompIdx * pCompIdx; - // size_t compIdxSize; // For table set usage SHelperTable tableInfo; SCompInfo * pCompInfo; - // size_t compInfoSize; bool hasOldLastBlock; // For block set usage SCompData *pCompData; - // size_t compDataSize; SDataCols *pDataCols[2]; } SRWHelper; diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index b4d7e169676c8d6adf045baba58d2b31413eed00..afe0582c1b9c58a1866553f885b2f536d81cd98f 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -25,6 +25,7 @@ #include "tutil.h" #include "tsdbMain.h" +#include "tchecksum.h" const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD @@ -310,7 +311,7 @@ static int tsdbWriteFileHead(SFile *pFile) { } static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { - int size = sizeof(SCompIdx) * maxTables; + int size = sizeof(SCompIdx) * maxTables + sizeof(TSCKSUM); void *buf = calloc(1, size); if (buf == NULL) return -1; @@ -319,6 +320,8 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { return -1; } + taosCalcChecksumAppend(0, (uint8_t *)buf, size); + if (write(pFile->fd, buf, size) < 0) { free(buf); return -1; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 292d2eabb91863b13a4e8c3b689ec226fdb3f4d4..55f80a8d22b03ac0a922e9361cf2e65766b158d5 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -56,7 +56,8 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, SDataCols *pDataCols); +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, + SDataCols *pDataCols); static TSKEY tsdbNextIterKey(SSkipListIterator *pIter); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); // static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, @@ -847,6 +848,7 @@ static void *tsdbCommitData(void *arg) { .maxRows = pCfg->maxRowsPerFileBlock, .maxCols = pMeta->maxCols, .minRowsPerFileBlock = pCfg->minRowsPerFileBlock, + .maxRowsPerFileBlock = pCfg->maxRowsPerFileBlock, .compress = 2 // TODO make it a configuration }; if (tsdbInitHelper(&whelper, &hcfg) < 0) goto _exit; @@ -911,6 +913,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { STable * pTable = pMeta->tables[tid]; + if (pTable == NULL) continue; + SSkipListIterator *pIter = iters[tid]; // Set the helper and the buffer dataCols object to help to write this table @@ -929,6 +933,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); + ASSERT(rowsWritten != 0); if (rowsWritten < 0) goto _err; ASSERT(rowsWritten <= pDataCols->numOfPoints); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 540cef5719e04c1491fe00a419c0a2f01335f70d..e24a3c5786e9355d332878d90f6ef529d63fb434 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -14,15 +14,7 @@ */ #include "tsdbMain.h" #include "tchecksum.h" - -#define adjustMem(ptr, size, expectedSize) \ - do { \ - if ((size) < (expectedSize)) { \ - (ptr) = realloc((void *)(ptr), (expectedSize)); \ - if ((ptr) == NULL) return -1; \ - (size) = (expectedSize); \ - } \ - } while (0) +#include "tscompression.h" // Local function definitions static int tsdbCheckHelperCfg(SHelperCfg *pCfg); @@ -33,12 +25,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pD SCompBlock *pCompBlock, bool isLast, bool isSuperBlock); static int compareKeyBlock(const void *arg1, const void *arg2); static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); -// static int nRowsLEThan(SDataCols *pDataCols, int maxKey); -// static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey); +static void tsdbResetHelperBlock(SRWHelper *pHelper); // ---------- Operations on Helper File part static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { @@ -72,6 +63,12 @@ static void tsdbResetHelperTableImpl(SRWHelper *pHelper) { pHelper->hasOldLastBlock = false; } +static void tsdbResetHelperTable(SRWHelper *pHelper) { + tsdbResetHelperBlock(pHelper); + tsdbResetHelperTableImpl(pHelper); + helperClearState(pHelper, TSDB_HELPER_TABLE_SET); +} + static void tsdbInitHelperTable(SRWHelper *pHelper) { tsdbResetHelperTableImpl(pHelper); } @@ -84,6 +81,10 @@ static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) { tdResetDataCols(pHelper->pDataCols[1]); } +static void tsdbResetHelperBlock(SRWHelper *pHelper) { + // TODO +} + static int tsdbInitHelperBlock(SRWHelper *pHelper) { pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows); @@ -167,7 +168,6 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { char *fnameDup = strdup(pHelper->files.headF.fname); - if (fnameDup == NULL) goto _err; if (fnameDup == NULL) return -1; char *dataDir = dirname(fnameDup); @@ -183,8 +183,8 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; // Create and open .h - if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) goto _err; - size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables; + if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1; + size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM); if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, tsize) < tsize) goto _err; // Create and open .l file if should @@ -221,23 +221,33 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { if (pHelper->files.nHeadF.fd > 0) { close(pHelper->files.nHeadF.fd); pHelper->files.nHeadF.fd = -1; - if (hasError) remove(pHelper->files.nHeadF.fname); + if (hasError) { + remove(pHelper->files.nHeadF.fname); + } else { + rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname); + pHelper->files.headF.info = pHelper->files.nHeadF.info; + } } if (pHelper->files.nLastF.fd > 0) { close(pHelper->files.nLastF.fd); pHelper->files.nLastF.fd = -1; - if (hasError) remove(pHelper->files.nLastF.fname); + if (hasError) { + remove(pHelper->files.nLastF.fname); + } else { + rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname); + pHelper->files.lastF.info = pHelper->files.nLastF.info; + } } return 0; } void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema) { - ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN)); + ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)); // Clear members and state used by previous table - // pHelper->blockIter = 0; - pHelper->state &= (TSDB_HELPER_TABLE_SET - 1); + tsdbResetHelperTable(pHelper); + ASSERT(pHelper->state == (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)); pHelper->tableInfo = *pHelperTable; tdInitDataCols(pHelper->pDataCols[0], pSchema); @@ -248,9 +258,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema pHelper->hasOldLastBlock = true; } - // pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid]; - helperSetState(pHelper, TSDB_HELPER_TABLE_SET); + ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); } /** @@ -268,7 +277,6 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { TSKEY keyFirst = dataColsKeyFirst(pDataCols); ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)); - // SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose // Load the SCompInfo part if neccessary @@ -283,7 +291,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { SFile *pWFile = NULL; bool isLast = false; - if (rowsToWrite > pHelper->config.minRowsPerFileBlock) { + if (rowsToWrite >= pHelper->config.minRowsPerFileBlock) { pWFile = &(pHelper->files.dataF); } else { isLast = true; @@ -314,7 +322,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { rowsToWrite = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyFirst-1); ASSERT(rowsToWrite > 0); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rowsToWrite, &compBlock, false, true) < 0) goto _err; - if (tsdbInsertSuperBlock(pHelper, pCompBlock, pCompBlock - pHelper->pCompInfo->blocks) < 0) goto _err; + if (tsdbInsertSuperBlock(pHelper, pCompBlock, blkIdx) < 0) goto _err; } } } @@ -367,6 +375,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; } } else { + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); if (pIdx->offset < 0) return -1; @@ -379,6 +388,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { int tsdbWriteCompIdx(SRWHelper *pHelper) { if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); + if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) return -1; return 0; @@ -392,8 +403,11 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { int fd = pHelper->files.headF.fd; if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; - if (tread(fd, (void *)(pHelper->pCompIdx), tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) return -1; - // TODO: check the correctness of the part + if (tread(fd, (void *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) return -1; + if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pCompIdx), tsizeof((void *)pHelper->pCompIdx))) { + // TODO: File is broken, try to deal with it + return -1; + } } helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); @@ -408,19 +422,16 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - // SCompIdx curCompIdx = pHelper->compIdx; - - ASSERT(pIdx->offset > 0 && pIdx->len > 0); - int fd = pHelper->files.headF.fd; if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1; + if (pIdx->offset > 0) { + if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1; - // adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len); - pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len); - if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; - // TODO: check the checksum + pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len); + if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; + if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) return -1; + } helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); } @@ -519,16 +530,21 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err; + ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); - { // TODO : check the correctness of the part + // TODO : check the checksum + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); + if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; + for (int i = 0; i < pCompData->numOfCols; i++) { + // TODO: check the data checksum + // if (!taosCheckChecksumWhole()) } ASSERT(pCompBlock->numOfCols == pCompData->numOfCols); pDataCols->numOfPoints = pCompBlock->numOfPoints; - size_t tlen = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; - int ccol = 0, dcol = 0; + int ccol = 0, dcol = 0; while (true) { if (ccol >= pDataCols->numOfCols) { // TODO: Fill rest NULL @@ -541,7 +557,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa if (pCompCol->colId == pDataCol->colId) { // TODO: uncompress - memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tlen + pCompCol->offset), pCompCol->len); + memcpy(pDataCol->pData, (void *)(((char *)pCompData) + tsize + pCompCol->offset), pCompCol->len); ccol++; dcol++; } else if (pCompCol->colId > pDataCol->colId) { @@ -567,8 +583,10 @@ int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target) { int numOfSubBlock = pCompBlock->numOfSubBlocks; if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset); + tdResetDataCols(pHelper->pDataCols[0]); if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err; for (int i = 1; i < numOfSubBlock; i++) { + tdResetDataCols(pHelper->pDataCols[1]); pCompBlock++; if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err; if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err; @@ -614,8 +632,11 @@ static void tsdbClearHelperFile(SHelperFile *pHFile) { } static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { - // TODO - return 0; + ASSERT(pHelper->files.lastF.fd > 0); + struct stat st; + fstat(pHelper->files.lastF.fd, &st); + if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true; + return false; } static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, @@ -629,7 +650,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) goto _err; - pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols); + pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols + sizeof(TSCKSUM)); if (pCompData == NULL) goto _err; int nColsNotAllNull = 0; @@ -639,12 +660,14 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; if (0) { - // TODO: all data are NULL + // TODO: all data to commit are NULL continue; } // Compress the data here - {} + { + // TODO + } pCompCol->colId = pDataCol->colId; pCompCol->type = pDataCol->type; @@ -655,14 +678,17 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa toffset += pCompCol->len; } - ASSERT(nColsNotAllNull > 0); + ASSERT(nColsNotAllNull > 0 && nColsNotAllNull <= pDataCols->numOfCols); pCompData->delimiter = TSDB_FILE_DELIMITER; pCompData->uid = pHelper->tableInfo.uid; pCompData->numOfCols = nColsNotAllNull; - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull; + // Write SCompData + SCompCol part + size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err; + // Write true data part int nCompCol = 0; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { ASSERT(nCompCol < nColsNotAllNull); @@ -679,7 +705,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa pCompBlock->last = isLast; pCompBlock->offset = offset; - pCompBlock->algorithm = 2; // TODO + pCompBlock->algorithm = 0; // TODO pCompBlock->numOfPoints = rowsToWrite; pCompBlock->sversion = pHelper->tableInfo.sversion; pCompBlock->len = (int32_t)tsize; @@ -713,18 +739,13 @@ static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) { return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2)); } -// static int nRowsLEThan(SDataCols *pDataCols, int maxKey) { -// void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE); -// if (ptr == NULL) return 0; -// return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1; -// } - // Merge the data with a block in file static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) { // TODO: set pHelper->hasOldBlock int rowsWritten = 0; SCompBlock compBlock = {0}; + ASSERT(pDataCols->numOfPoints > 0); TSKEY keyFirst = dataColsKeyFirst(pDataCols); SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; @@ -733,7 +754,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; ASSERT(pCompBlock->numOfSubBlocks >= 1); ASSERT(keyFirst >= pCompBlock->keyFirst); - ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); + // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); if (keyFirst > pCompBlock->keyLast) { // Merge the last block by append ASSERT(pCompBlock->last && pCompBlock->numOfPoints < pHelper->config.minRowsPerFileBlock); @@ -870,9 +891,13 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) { ASSERT(spaceLeft >= 0); if (spaceLeft < spaceNeeded) { size_t tsize = tsizeof(pHelper->pCompInfo) + sizeof(SCompBlock) * 16; - if (tsizeof(pHelper->pCompInfo) == 0) tsize += sizeof(SCompInfo); + if (tsizeof(pHelper->pCompInfo) == 0) { + pIdx->len = sizeof(SCompData) + sizeof(TSCKSUM); + tsize = tsize + sizeof(SCompInfo) + sizeof(TSCKSUM); + } pHelper->pCompInfo = (SCompInfo *)trealloc(pHelper->pCompInfo, tsize); + if (pHelper->pCompInfo == NULL) return -1; } return 0; @@ -881,20 +906,23 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) { static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(blkIdx >=0 && blkIdx <= pIdx->numOfSuperBlocks); + ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfSuperBlocks); ASSERT(pCompBlock->numOfSubBlocks == 1); // Adjust memory if no more room - if (tsdbAdjustInfoSizeIfNeeded(pHelper, sizeof(SCompBlock)) < 0) goto _err; - - // Insert the block - if (blkIdx < pIdx->numOfSuperBlocks) { - SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + blkIdx; - memmove((void *)(pTCompBlock + 1), (void *)pTCompBlock, pIdx->len - sizeof(SCompInfo) - sizeof(SCompBlock) *blkIdx); - pTCompBlock++; - for (int i = 0; i < pIdx->numOfSuperBlocks - blkIdx; i++) { - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); - } + if (tsdbAdjustInfoSizeIfNeeded(pHelper, sizeof(SCompBlock)) < 0) goto _err; + + // Change the offset + for (int i = 0; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); + } + + // Memmove if needed + int tsize = pIdx->len - (sizeof(SCompData) + sizeof(SCompCol) * blkIdx); + if (tsize > 0) { + memmove((void *)((char *)pHelper->pCompInfo + sizeof(SCompData) + sizeof(SCompBlock) * (blkIdx + 1)), + (void *)((char *)pHelper->pCompInfo + sizeof(SCompData) + sizeof(SCompBlock) * blkIdx), tsize); } pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; @@ -905,7 +933,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int return 0; - _err: +_err: return -1; } @@ -913,6 +941,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ASSERT(pCompBlock->numOfSubBlocks == 0); SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); + SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); @@ -926,7 +956,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)), (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); - for (int i = blkIdx; i < pIdx->numOfSuperBlocks; i++) { + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); } @@ -936,11 +966,15 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock; pSCompBlock->numOfSubBlocks++; + ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); pSCompBlock->len += sizeof(SCompBlock); + pSCompBlock->numOfPoints += rowsAdded; + pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst); + pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast); pIdx->len += sizeof(SCompBlock); } else { // Need to create two sub-blocks void *ptr = NULL; - for (int i = blkIdx - 1; i >= 0; i--) { + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; if (pTCompBlock->numOfSubBlocks > 1) { ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len); @@ -948,8 +982,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId } } - if (ptr == NULL) - ptr = (void *)((char *)(pHelper->pCompInfo) + sizeof(SCompInfo) + sizeof(SCompBlock) * pIdx->numOfSuperBlocks); + if (ptr == NULL) ptr = (void *)((char *)(pHelper->pCompInfo) + pIdx->len - sizeof(TSCKSUM)); size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); if (tsize > 0) { @@ -1040,5 +1073,5 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) { if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0; - return (TSKEY *)ptr2 - (TSKEY *)ptr1; + return ((TSKEY *)ptr2 - (TSKEY *)ptr1) + 1; } \ No newline at end of file diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index 28d511ae2b3473a1da96ebac79365b60a333a58c..b54db6febbad7a7361aa6977f755a47dcf740ea9 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -48,98 +48,99 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); } -TEST(TsdbTest, DISABLED_createRepo) { -// TEST(TsdbTest, createRepo) { - // STsdbCfg config; - - // // 1. Create a tsdb repository - // tsdbSetDefaultCfg(&config); - // tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL); - // ASSERT_NE(pRepo, nullptr); - - // // 2. Create a normal table - // STableCfg tCfg; - // ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); - // ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0); - - // int nCols = 5; - // STSchema *schema = tdNewSchema(nCols); - - // for (int i = 0; i < nCols; i++) { - // if (i == 0) { - // tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); - // } else { - // tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); - // } - // } - - // tsdbTableSetSchema(&tCfg, schema, true); - - // tsdbCreateTable(pRepo, &tCfg); - - // // // 3. Loop to write some simple data - // int nRows = 1; - // int rowsPerSubmit = 1; - // int64_t start_time = 1584081000000; - - // SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); - - // double stime = getCurTime(); - - // for (int k = 0; k < nRows/rowsPerSubmit; k++) { - // memset((void *)pMsg, 0, sizeof(SSubmitMsg)); - // SSubmitBlk *pBlock = pMsg->blocks; - // pBlock->uid = 987607499877672L; - // pBlock->tid = 0; - // pBlock->sversion = 0; - // pBlock->len = 0; - // for (int i = 0; i < rowsPerSubmit; i++) { - // // start_time += 1000; - // start_time += 1000; - // SDataRow row = (SDataRow)(pBlock->data + pBlock->len); - // tdInitDataRow(row, schema); - - // for (int j = 0; j < schemaNCols(schema); j++) { - // if (j == 0) { // Just for timestamp - // tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); - // } else { // For int - // int val = 10; - // tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); - // } - // } - // pBlock->len += dataRowLen(row); - // } - // pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; - // pMsg->numOfBlocks = 1; - - // pBlock->len = htonl(pBlock->len); - // pBlock->numOfRows = htonl(pBlock->numOfRows); - // pBlock->uid = htobe64(pBlock->uid); - // pBlock->tid = htonl(pBlock->tid); - - // pBlock->sversion = htonl(pBlock->sversion); - // pBlock->padding = htonl(pBlock->padding); - - // pMsg->length = htonl(pMsg->length); - // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - // pMsg->compressed = htonl(pMsg->numOfBlocks); - - // tsdbInsertData(pRepo, pMsg); - // } - - // double etime = getCurTime(); - - // void *ptr = malloc(150000); - // free(ptr); - - // printf("Spent %f seconds to write %d records\n", etime - stime, nRows); - - // tsdbCloseRepo(pRepo); +// TEST(TsdbTest, DISABLED_createRepo) { +TEST(TsdbTest, createRepo) { + STsdbCfg config; + // 1. Create a tsdb repository + tsdbSetDefaultCfg(&config); + ASSERT_EQ(tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL), 0); + + tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); + ASSERT_NE(pRepo, nullptr); + + // 2. Create a normal table + STableCfg tCfg; + ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); + ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0); + + int nCols = 5; + STSchema *schema = tdNewSchema(nCols); + + for (int i = 0; i < nCols; i++) { + if (i == 0) { + tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1); + } else { + tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1); + } + } + + tsdbTableSetSchema(&tCfg, schema, true); + + tsdbCreateTable(pRepo, &tCfg); + + // // 3. Loop to write some simple data + int nRows = 10000000; + int rowsPerSubmit = 10; + int64_t start_time = 1584081000000; + + SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit); + + double stime = getCurTime(); + + for (int k = 0; k < nRows/rowsPerSubmit; k++) { + memset((void *)pMsg, 0, sizeof(SSubmitMsg)); + SSubmitBlk *pBlock = pMsg->blocks; + pBlock->uid = 987607499877672L; + pBlock->tid = 0; + pBlock->sversion = 0; + pBlock->len = 0; + for (int i = 0; i < rowsPerSubmit; i++) { + // start_time += 1000; + start_time += 1000; + SDataRow row = (SDataRow)(pBlock->data + pBlock->len); + tdInitDataRow(row, schema); + + for (int j = 0; j < schemaNCols(schema); j++) { + if (j == 0) { // Just for timestamp + tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j)); + } else { // For int + int val = 10; + tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); + } + } + pBlock->len += dataRowLen(row); + } + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + pMsg->numOfBlocks = 1; + + pBlock->len = htonl(pBlock->len); + pBlock->numOfRows = htonl(pBlock->numOfRows); + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + + pBlock->sversion = htonl(pBlock->sversion); + pBlock->padding = htonl(pBlock->padding); + + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + pMsg->compressed = htonl(pMsg->numOfBlocks); + + tsdbInsertData(pRepo, pMsg); + } + + double etime = getCurTime(); + + void *ptr = malloc(150000); + free(ptr); + + printf("Spent %f seconds to write %d records\n", etime - stime, nRows); + + tsdbCloseRepo(pRepo); } -// TEST(TsdbTest, DISABLED_openRepo) { -TEST(TsdbTest, openRepo) { +TEST(TsdbTest, DISABLED_openRepo) { +// TEST(TsdbTest, openRepo) { // tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL); // ASSERT_NE(repo, nullptr);