提交 ab22c7f6 编写于 作者: H hzcheng

TD-34

上级 ffb02d97
......@@ -900,6 +900,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
// TODO: make it more flexible
pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000);
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */
......@@ -908,7 +911,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
if (pGroup == NULL) { /* TODO */
}
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0);
if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
isNewLastFile = 1;
......@@ -929,6 +932,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
SSkipListIterator *pIter = iters[tid];
SCompIdx * pIdx = &pIndices[tid];
int nNewBlocks = 0;
if (pTable == NULL || pIter == NULL) continue;
/* If no new data to write for this table, just write the old data to new file
......@@ -936,8 +941,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
*/
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
// has old data
if (pIdx->offset > 0) {
if (isNewLastFile && pIdx->hasLast) {
if (pIdx->len > 0) {
goto _table_over;
// if (isNewLastFile && pIdx->hasLast) {
if (0) {
// need to move the last block to new file
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */
}
......@@ -976,9 +983,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
continue;
}
pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pCompInfo->checksum = 0;
pCompInfo->uid = pTable->tableId.uid;
// Load SCompBlock part if neccessary
int isCompBlockLoaded = 0;
if (pIdx->offset > 0) {
if (0) {
// if (pIdx->offset > 0) {
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
// has last block || cache key overlap with commit key
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100);
......@@ -998,34 +1010,50 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols);
if (pCols->numOfPoints == 0) break;
int pointsWritten = 0;
// { // TODO : try to write the block data to file
// if (!isCompBlockLoaded) { // Just append
// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file
// lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END);
// } else {
// if (isNewLastFile) { // write directly to .l file
// } else { // write directly to .last file
int pointsWritten = pCols->numOfPoints;
// TODO: all write to the end of .data file
int64_t toffset = 0;
int32_t tlen = 0;
tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
// Make the compBlock
SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++;
pTBlock->offset = toffset;
pTBlock->len = tlen;
pTBlock->keyFirst = dataColsKeyFirst(pCols);
pTBlock->keyLast = dataColsKeyLast(pCols);
pTBlock->last = 0;
pTBlock->algorithm = 0;
pTBlock->numOfPoints = pCols->numOfPoints;
pTBlock->sversion = pTable->sversion;
pTBlock->numOfSubBlocks = 1;
if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols);
// }
// }
// } else { // Need to append
// // SCompBlock *pTBlock = NULL;
// }
// }
// pointsWritten = pCols->numOfPoints;
tdPopDataColsPoints(pCols, pointsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints;
}
_table_over:
// Write the SCompBlock part
if (isCompBlockLoaded) {
// merge the block into old and update pIdx
pIdx->offset = lseek(hFile.fd, 0, SEEK_END);
if (pIdx->len > 0) {
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
if (nNewBlocks > 0) {
write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks);
pIdx->len += (sizeof(SCompBlock) * nNewBlocks);
}
} else {
// sendfile the SCompBlock part and update the pIdx
if (nNewBlocks > 0) {
write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks);
pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks;
}
}
pIdx->checksum = 0;
pIdx->numOfSuperBlocks += nNewBlocks;
pIdx->hasLast = 0;
}
// Write the SCompIdx part
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册