diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index d18e766c38960b07669f6a66daac408e524c770a..984f2ca203c27d17b59e5ee495446c31999dfc60 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -1069,10 +1069,12 @@ static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsTo // Write the block if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; + *len += size; for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { SDataCol *pDataCol = pCols->cols + iCol; SCompCol *pCompCol = pCompData->cols + iCol; if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; + *len += pCompCol->len; } if (pCompData == NULL) free((void *)pCompData); @@ -1083,22 +1085,46 @@ _err: return -1; } -static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) { - STsdbCfg *pCfg = &(pRepo->config); +static int compareKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SCompBlock *pBlock = (SCompBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } + + return 0; +} + +static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { + STsdbCfg * pCfg = &(pRepo->config); SCompData *pCompData = NULL; + SFile * pFile = NULL; + int numOfPointsToWrite = 0; + int64_t offset = 0; + int32_t len = 0; memset((void *)pCompBlock, 0, sizeof(SCompBlock)); if (pCompInfo == NULL) { - if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file - // tsdbWriteBlockToFileImpl() - } else { // Write to .last or .l file + // Just append the data block to .data or .l or .last file + numOfPointsToWrite = pCols->numOfPoints; + if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file + pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]); + } else { // Write to .last or .l file pCompBlock->last = 1; - + if (lFile) { + pFile = lFile; + } else { + pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]); + } } - // pCompBlock->offset = ; - // pCompBlock->len = ; - pCompBlock->algorithm = 2; // TODO : add to configuration + tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid); + pCompBlock->offset = offset; + pCompBlock->len = len; + pCompBlock->algorithm = 2; // TODO : add to configuration pCompBlock->sversion = pCols->sversion; pCompBlock->numOfPoints = pCols->numOfPoints; pCompBlock->numOfSubBlocks = 1; @@ -1106,7 +1132,38 @@ static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCo pCompBlock->keyFirst = dataColsKeyFirst(pCols); pCompBlock->keyLast = dataColsKeyLast(pCols); } else { - // Need to merge + // Need to merge the block to either the last block or the other block + TSKEY keyFirst = dataColsKeyFirst(pCols); + SCompBlock *pMergeBlock = NULL; + + // Search the block to merge in + void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks, + compareKeyBlock, TD_GE); + if (ptr == NULL) { + // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block + // and merge the data + pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1); + } else { + pMergeBlock = (SCompBlock *)ptr; + } + + if (pMergeBlock->last) { + if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) { + // Need to load the data from .last and combine data in pCols to write to .data file + + } else { // Just append the block to .last or .l file + if (lFile) { + // read the block from .last file and merge with pCols, write to .l file + + } else { + // tsdbWriteBlockToFileImpl(); + } + } + } else { // The block need to merge in .data file + + } + } - return 0; + + return numOfPointsToWrite; } \ No newline at end of file