diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 56565b6e5c6e16cb26b4e8469132ee2a3f833d11..0496fc6feb313597b343f10e366e838a46f1697f 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -355,7 +355,18 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { } // Pop pointsToPop points from the SDataCols void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { - + int pointsLeft = pCols->numOfPoints - pointsToPop; + + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SDataCol *p_col = pCols->cols + iCol; + if (p_col->len > 0) { + p_col->len = TYPE_BYTES[p_col->type] * pointsLeft; + if (pointsLeft > 0) { + memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len); + } + } + } + pCols->numOfPoints = pointsLeft; } /** diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index ac96d15126796dde5456decc8cac72dd796be007..0f654ad0bde2bf5d2656761b8b2775f18c028dc2 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -148,8 +148,6 @@ int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SD // TODO: need an API to merge all sub-block data into one -int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); - void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 3567517d79119a8a446716725dc60595f29e3777..088a164933865382ae33e1b15518818c6d385e27 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -196,65 +196,6 @@ int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void return 0; } -static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write - SDataCols * pCols, // Data column buffer - int numOfPointsToWrie, // Number of points to write to the file - SCompBlock *pBlock // SCompBlock to hold block information to return - ) { - // pBlock->last = 0; - // pBlock->offset = lseek(pFile->fd, 0, SEEK_END); - // // pBlock->algorithm = ; - // pBlock->numOfPoints = pCols->numOfPoints; - // // pBlock->sversion = ; - // // pBlock->len = ; - // pBlock->numOfSubBlocks = 1; - // pBlock->keyFirst = dataColsKeyFirst(pCols); - // pBlock->keyLast = dataColsKeyLast(pCols); - // for (int i = 0; i < pCols->numOfCols; i++) { - // // TODO: if all col value is NULL, do not save it - // pBlock->numOfCols++; - // pCompData->numOfCols++; - // SCompCol *pCompCol = pCompData->cols + i; - // pCompCol->colId = pCols->cols[i].colId; - // pCompCol->type = pCols->cols[i].type; - - // // pCompCol->len = ; - // // pCompCol->offset = ; - // } - - return 0; -} - -int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) { - memset((void *)pBlock, 0, sizeof(SCompBlock)); - SFile *pFile = NULL; - SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols); - if (pCompData == NULL) return -1; - pCompData->delimiter = TSDB_FILE_DELIMITER; - // pCompData->uid = ; - - if (isMerge) { - TSKEY keyFirst = dataColsKeyFirst(pCols); - // 1. Binary search the block the data can merged into - - if (1/* the data should only merged into last file */) { - } else { - } - } else { - // Write directly to the file without merge - if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) { - // TODO: write the data to the last file - } else { - // TODO: wirte the data to the data file - } - } - - // TODO: need to update pIdx - - if (pCompData) free(pCompData); - return 0; -} - static int compFGroupKey(const void *key, const void *fgroup) { int fid = *(int *)key; SFileGroup *pFGroup = (SFileGroup *)fgroup; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 42792bc1b8bb28e4f00e00b6861d9d2822db419c..5f7dd2513f312bbb5907bac4df8f700a6cfbe789 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -967,9 +967,26 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); if (pCols->numOfPoints == 0) break; - // TODO - int pointsWritten = 0; /* tsdbWriteBlockToFile(); */ + int pointsWritten = 0; + // { // TODO : try to write the block data to file + // if (!isCompBlockLoaded) { // Just append + // if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file + // lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END); + + // } else { + // if (isNewLastFile) { // write directly to .l file + + // } else { // write directly to .last file + + // } + // } + // } else { // Need to append + // // SCompBlock *pTBlock = NULL; + // } + // } + pointsWritten = pCols->numOfPoints; tdPopDataColsPoints(pCols, pointsWritten); + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; } // Write the SCompBlock part @@ -1022,4 +1039,17 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1; } return 0; +} + +static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) { + STsdbCfg *pCfg = &(pRepo->config); + + memset((void *)pCompBlock, 0, sizeof(SCompBlock)); + + if (pCompInfo == NULL) { + // Just need to append to file + } else { + // Need to merge + } + return 0; } \ No newline at end of file