diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 599af738137fc4e3bd7a8a0addd227afc11a911d..a3367d2e516f09d88867e1974b95824f53327fd9 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -892,13 +892,15 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters // Check if there are data to commit to this file int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); - if (!hasDataToCommit) return 0; // No data to commit, just return + if (!hasDataToCommit) return 0; // No data to commit, just return // Create and open files for commit tsdbGetDataDirName(pRepo, dataDir); - if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {/* TODO */} + if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */ + } pGroup = tsdbOpenFilesForCommit(pFileH, fid); - if (pGroup == NULL) {/* TODO */} + if (pGroup == NULL) { /* TODO */ + } tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { // TODO: make it not to write the last file every time @@ -907,28 +909,32 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters } // Load the SCompIdx - pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) *pCfg->maxTables); - if (pIndices == NULL) {/* TODO*/} - if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {/* TODO */} + pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + if (pIndices == NULL) { /* TODO*/ + } + if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ + } -// Loop to commit data in each table + // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; + STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; - SCompIdx *pIdx = &pIndices[tid]; + SCompIdx * pIdx = &pIndices[tid]; if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { // This table does not have data in this range, just copy its head part and last // part (if neccessary) to new file - if (pIdx->len != 0) { // has old data + if (pIdx->offset > 0) { // has old data if (isNewLastFile && pIdx->hasLast) { // Need to load SCompBlock part and copy to new file - if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) {/* TODO */} - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {/* TODO */} + if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ + } + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ + } - // Copy the last block from old last file to new file + // TODO: Copy the last block from old last file to new file // tsdbCopyBlockData() - } else { + } else { pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); hFile.size += pIdx->len; @@ -937,141 +943,43 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters continue; } - // while () { + // Load SCompBlock part if neccessary + int isCompBlockLoaded = 0; + if (pIdx->offset > 0) { + if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { + // has last block || cache key overlap with commit key + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ + } + if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; + } else { + // TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part + // and write those new blocks to it + } + } + + tdInitDataCols(pCols, pTable->schema); - // } - } + int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; + while (1) { + tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); + if (pCols->numOfPoints == 0) break; - // for (int tid = 0; tid < pCfg->maxTables; tid++) { - // STable * pTable = pMeta->tables[tid]; - // SSkipListIterator *pIter = iters[tid]; - // int isLoadCompBlocks = 0; - // char dataDir[128] = "\0"; - - // if (pIter == NULL) { - // if (hasDataToCommit && isNewLastFile()) - // continue; - // } - // tdInitDataCols(pCols, pTable->schema); - - // int numOfWrites = 0; - // int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose - // // Loop to read columns from cache - // while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { - // if (!hasDataToCommit) { - // // There are data to commit to this fileId, we need to create/open it for read/write. - // // At the meantime, we set the flag to prevent further create/open operations - // tsdbGetDataDirName(pRepo, dataDir); - - // if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { - // // TODO: deal with the ERROR here - // } - // // Open files for commit - // pGroup = tsdbOpenFilesForCommit(pFileH, fid); - // if (pGroup == NULL) { - // // TODO: deal with the ERROR here - // } - // // TODO: open .h file and if neccessary, open .l file - // tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); - // if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { - // // TODO: make it not to write the last file every time - // tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); - // isNewLastFile = 1; - // } - - // // load the SCompIdx part - // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - // if (pIndices == NULL) { // TODO: deal with the ERROR - // } - // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here - // } - - // // sendfile those not need to changed table content - // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, - // SEEK_SET); - // lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); - // for (int ttid = 0; ttid < tid; ttid++) { - // SCompIdx * tIdx= &pIndices[ttid]; - // if (tIdx->len <= 0) continue; - // if (isNewLastFile && tIdx->hasLast) { - // // TODO: Need to load the SCompBlock part and copy to new last file - // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); - // if (pCompInfo == NULL) { /* TODO */} - // if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} - // SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); - // int numOfSubBlocks = pLastBlock->numOfSubBlocks; - // assert(pLastBlock->last); - // if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} - // { - // if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); - // tIdx->checksum = 0; - // } - // write(tFile.fd, (void *)pCompInfo, tIdx->len); - // tFile.size += tIdx->len; - // } else { - // sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); - // tFile.size += tIdx->len; - // } - // } - - // hasDataToCommit = 1; - // } - - // SCompIdx *pIdx = &pIndices[tid]; - - // /* The first time to write to the table, need to decide - // * if it is neccessary to load the SComplock part. If it - // * is needed, just load it, or, just use sendfile and - // * append it. - // */ - // if (numOfWrites == 0 && pIdx->offset > 0) { - // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { - // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); - // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { - // // TODO: deal with the ERROR here - // } - // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; - // } else { - // // TODO: sendfile the prefix part - // } - // } - - // int numOfPointsWritten = tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols); - // if (numOfPointsWritten < 0) { - // // TODO: deal with the ERROR here - // } - - // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); - - - // // if (1 /* the SCompBlock part is not loaded*/) { - // // // Append to .data file generate a SCompBlock and record it - // // } else { - // // } - - // // // TODO: need to reset the pCols - - // numOfWrites++; - // } - - // if (pCols->numOfPoints > 0) { - // // TODO: still has data to commit, commit it - // } - - // if (1/* SCompBlock part is loaded, write it to .head file*/) { - // // TODO - // } else { - // // TODO: use sendfile send the old part and append the newly added part - // } - // } - - // Write the SCompIdx part - - // Close all files and return - if (hasDataToCommit) { - // TODO + // TODO + int pointsWritten = 0; /* tsdbWriteBlockToFile(); */ + tdPopDataColsPoints(pCols, pointsWritten); + } + + // Write the SCompBlock part + if (isCompBlockLoaded) { + // merge the block into old and update pIdx + } else { + // sendfile the SCompBlock part and update the pIdx + } } + // TODO: close the files and replace the .head and .last file + {} + if (pIndices) free(pIndices); if (pCompInfo) free(pCompInfo);