提交 016b5e85 编写于 作者: H hzcheng

TD-34

上级 f29b0fca
......@@ -892,13 +892,15 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
if (!hasDataToCommit) return 0; // No data to commit, just return
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {/* TODO */}
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */
}
pGroup = tsdbOpenFilesForCommit(pFileH, fid);
if (pGroup == NULL) {/* TODO */}
if (pGroup == NULL) { /* TODO */
}
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0);
if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time
......@@ -907,28 +909,32 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
}
// Load the SCompIdx
pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) *pCfg->maxTables);
if (pIndices == NULL) {/* TODO*/}
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {/* TODO */}
pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
if (pIndices == NULL) { /* TODO*/
}
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */
}
// Loop to commit data in each table
// Loop to commit data in each table
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
STable * pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
SCompIdx *pIdx = &pIndices[tid];
SCompIdx * pIdx = &pIndices[tid];
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
// This table does not have data in this range, just copy its head part and last
// part (if neccessary) to new file
if (pIdx->len != 0) { // has old data
if (pIdx->offset > 0) { // has old data
if (isNewLastFile && pIdx->hasLast) {
// Need to load SCompBlock part and copy to new file
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) {/* TODO */}
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {/* TODO */}
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */
}
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
}
// Copy the last block from old last file to new file
// TODO: Copy the last block from old last file to new file
// tsdbCopyBlockData()
} else {
} else {
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
hFile.size += pIdx->len;
......@@ -937,141 +943,43 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
continue;
}
// while () {
// Load SCompBlock part if neccessary
int isCompBlockLoaded = 0;
if (pIdx->offset > 0) {
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
// has last block || cache key overlap with commit key
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
}
if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1;
} else {
// TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part
// and write those new blocks to it
}
}
tdInitDataCols(pCols, pTable->schema);
// }
}
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
while (1) {
tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols);
if (pCols->numOfPoints == 0) break;
// for (int tid = 0; tid < pCfg->maxTables; tid++) {
// STable * pTable = pMeta->tables[tid];
// SSkipListIterator *pIter = iters[tid];
// int isLoadCompBlocks = 0;
// char dataDir[128] = "\0";
// if (pIter == NULL) {
// if (hasDataToCommit && isNewLastFile())
// continue;
// }
// tdInitDataCols(pCols, pTable->schema);
// int numOfWrites = 0;
// int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose
// // Loop to read columns from cache
// while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) {
// if (!hasDataToCommit) {
// // There are data to commit to this fileId, we need to create/open it for read/write.
// // At the meantime, we set the flag to prevent further create/open operations
// tsdbGetDataDirName(pRepo, dataDir);
// if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // Open files for commit
// pGroup = tsdbOpenFilesForCommit(pFileH, fid);
// if (pGroup == NULL) {
// // TODO: deal with the ERROR here
// }
// // TODO: open .h file and if neccessary, open .l file
// tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0);
// if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// // TODO: make it not to write the last file every time
// tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
// isNewLastFile = 1;
// }
// // load the SCompIdx part
// pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
// if (pIndices == NULL) { // TODO: deal with the ERROR
// }
// if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here
// }
// // sendfile those not need to changed table content
// lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables,
// SEEK_SET);
// lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET);
// for (int ttid = 0; ttid < tid; ttid++) {
// SCompIdx * tIdx= &pIndices[ttid];
// if (tIdx->len <= 0) continue;
// if (isNewLastFile && tIdx->hasLast) {
// // TODO: Need to load the SCompBlock part and copy to new last file
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len);
// if (pCompInfo == NULL) { /* TODO */}
// if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */}
// SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1);
// int numOfSubBlocks = pLastBlock->numOfSubBlocks;
// assert(pLastBlock->last);
// if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */}
// {
// if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks);
// tIdx->checksum = 0;
// }
// write(tFile.fd, (void *)pCompInfo, tIdx->len);
// tFile.size += tIdx->len;
// } else {
// sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len);
// tFile.size += tIdx->len;
// }
// }
// hasDataToCommit = 1;
// }
// SCompIdx *pIdx = &pIndices[tid];
// /* The first time to write to the table, need to decide
// * if it is neccessary to load the SComplock part. If it
// * is needed, just load it, or, just use sendfile and
// * append it.
// */
// if (numOfWrites == 0 && pIdx->offset > 0) {
// if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
// if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// // TODO: deal with the ERROR here
// }
// if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
// } else {
// // TODO: sendfile the prefix part
// }
// }
// int numOfPointsWritten = tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols);
// if (numOfPointsWritten < 0) {
// // TODO: deal with the ERROR here
// }
// // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
// // if (1 /* the SCompBlock part is not loaded*/) {
// // // Append to .data file generate a SCompBlock and record it
// // } else {
// // }
// // // TODO: need to reset the pCols
// numOfWrites++;
// }
// if (pCols->numOfPoints > 0) {
// // TODO: still has data to commit, commit it
// }
// if (1/* SCompBlock part is loaded, write it to .head file*/) {
// // TODO
// } else {
// // TODO: use sendfile send the old part and append the newly added part
// }
// }
// Write the SCompIdx part
// Close all files and return
if (hasDataToCommit) {
// TODO
// TODO
int pointsWritten = 0; /* tsdbWriteBlockToFile(); */
tdPopDataColsPoints(pCols, pointsWritten);
}
// Write the SCompBlock part
if (isCompBlockLoaded) {
// merge the block into old and update pIdx
} else {
// sendfile the SCompBlock part and update the pIdx
}
}
// TODO: close the files and replace the .head and .last file
{}
if (pIndices) free(pIndices);
if (pCompInfo) free(pCompInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册