diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 769fc238153987b4cf14e09ca2692d260fdf9b3a..6abee55582738beac6638cecda7e04fb21a3dd4f 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -900,6 +900,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); if (!hasDataToCommit) return 0; // No data to commit, just return + // TODO: make it more flexible + pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000); + // Create and open files for commit tsdbGetDataDirName(pRepo, dataDir); if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ @@ -908,7 +911,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (pGroup == NULL) { /* TODO */ } tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); - if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { // TODO: make it not to write the last file every time tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); isNewLastFile = 1; @@ -929,6 +932,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters SSkipListIterator *pIter = iters[tid]; SCompIdx * pIdx = &pIndices[tid]; + int nNewBlocks = 0; + if (pTable == NULL || pIter == NULL) continue; /* If no new data to write for this table, just write the old data to new file @@ -936,8 +941,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters */ if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { // has old data - if (pIdx->offset > 0) { - if (isNewLastFile && pIdx->hasLast) { + if (pIdx->len > 0) { + goto _table_over; + // if (isNewLastFile && pIdx->hasLast) { + if (0) { // need to move the last block to new file if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ } @@ -976,9 +983,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters continue; } + pCompInfo->delimiter = TSDB_FILE_DELIMITER; + pCompInfo->checksum = 0; + pCompInfo->uid = pTable->tableId.uid; + // Load SCompBlock part if neccessary int isCompBlockLoaded = 0; - if (pIdx->offset > 0) { + if (0) { + // if (pIdx->offset > 0) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { // has last block || cache key overlap with commit key pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); @@ -998,34 +1010,50 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); if (pCols->numOfPoints == 0) break; - int pointsWritten = 0; - // { // TODO : try to write the block data to file - // if (!isCompBlockLoaded) { // Just append - // if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file - // lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END); - - // } else { - // if (isNewLastFile) { // write directly to .l file - - // } else { // write directly to .last file + int pointsWritten = pCols->numOfPoints; + // TODO: all write to the end of .data file + int64_t toffset = 0; + int32_t tlen = 0; + tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid); + + // Make the compBlock + SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++; + pTBlock->offset = toffset; + pTBlock->len = tlen; + pTBlock->keyFirst = dataColsKeyFirst(pCols); + pTBlock->keyLast = dataColsKeyLast(pCols); + pTBlock->last = 0; + pTBlock->algorithm = 0; + pTBlock->numOfPoints = pCols->numOfPoints; + pTBlock->sversion = pTable->sversion; + pTBlock->numOfSubBlocks = 1; + + if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols); - // } - // } - // } else { // Need to append - // // SCompBlock *pTBlock = NULL; - // } - // } - // pointsWritten = pCols->numOfPoints; tdPopDataColsPoints(pCols, pointsWritten); maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; } + +_table_over: // Write the SCompBlock part - if (isCompBlockLoaded) { - // merge the block into old and update pIdx + pIdx->offset = lseek(hFile.fd, 0, SEEK_END); + if (pIdx->len > 0) { + sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); + if (nNewBlocks > 0) { + write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks); + pIdx->len += (sizeof(SCompBlock) * nNewBlocks); + } } else { - // sendfile the SCompBlock part and update the pIdx + if (nNewBlocks > 0) { + write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks); + pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks; + } } + + pIdx->checksum = 0; + pIdx->numOfSuperBlocks += nNewBlocks; + pIdx->hasLast = 0; } // Write the SCompIdx part