提交 f0bd200e 编写于 作者: H hzcheng

TD-34

上级 12d2e463
...@@ -1069,10 +1069,12 @@ static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsTo ...@@ -1069,10 +1069,12 @@ static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsTo
// Write the block // Write the block
if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err;
*len += size;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pDataCol = pCols->cols + iCol; SDataCol *pDataCol = pCols->cols + iCol;
SCompCol *pCompCol = pCompData->cols + iCol; SCompCol *pCompCol = pCompData->cols + iCol;
if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err;
*len += pCompCol->len;
} }
if (pCompData == NULL) free((void *)pCompData); if (pCompData == NULL) free((void *)pCompData);
...@@ -1083,21 +1085,45 @@ _err: ...@@ -1083,21 +1085,45 @@ _err:
return -1; return -1;
} }
static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) { static int compareKeyBlock(const void *arg1, const void *arg2) {
STsdbCfg *pCfg = &(pRepo->config); TSKEY key = *(TSKEY *)arg1;
SCompBlock *pBlock = (SCompBlock *)arg2;
if (key < pBlock->keyFirst) {
return -1;
} else if (key > pBlock->keyLast) {
return 1;
}
return 0;
}
static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) {
STsdbCfg * pCfg = &(pRepo->config);
SCompData *pCompData = NULL; SCompData *pCompData = NULL;
SFile * pFile = NULL;
int numOfPointsToWrite = 0;
int64_t offset = 0;
int32_t len = 0;
memset((void *)pCompBlock, 0, sizeof(SCompBlock)); memset((void *)pCompBlock, 0, sizeof(SCompBlock));
if (pCompInfo == NULL) { if (pCompInfo == NULL) {
// Just append the data block to .data or .l or .last file
numOfPointsToWrite = pCols->numOfPoints;
if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file
// tsdbWriteBlockToFileImpl() pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]);
} else { // Write to .last or .l file } else { // Write to .last or .l file
pCompBlock->last = 1; pCompBlock->last = 1;
if (lFile) {
pFile = lFile;
} else {
pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]);
}
} }
// pCompBlock->offset = ; tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid);
// pCompBlock->len = ; pCompBlock->offset = offset;
pCompBlock->len = len;
pCompBlock->algorithm = 2; // TODO : add to configuration pCompBlock->algorithm = 2; // TODO : add to configuration
pCompBlock->sversion = pCols->sversion; pCompBlock->sversion = pCols->sversion;
pCompBlock->numOfPoints = pCols->numOfPoints; pCompBlock->numOfPoints = pCols->numOfPoints;
...@@ -1106,7 +1132,38 @@ static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCo ...@@ -1106,7 +1132,38 @@ static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCo
pCompBlock->keyFirst = dataColsKeyFirst(pCols); pCompBlock->keyFirst = dataColsKeyFirst(pCols);
pCompBlock->keyLast = dataColsKeyLast(pCols); pCompBlock->keyLast = dataColsKeyLast(pCols);
} else { } else {
// Need to merge // Need to merge the block to either the last block or the other block
TSKEY keyFirst = dataColsKeyFirst(pCols);
SCompBlock *pMergeBlock = NULL;
// Search the block to merge in
void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks,
compareKeyBlock, TD_GE);
if (ptr == NULL) {
// No block greater or equal than the key, but there are data in the .last file, need to merge the last file block
// and merge the data
pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1);
} else {
pMergeBlock = (SCompBlock *)ptr;
} }
return 0;
if (pMergeBlock->last) {
if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) {
// Need to load the data from .last and combine data in pCols to write to .data file
} else { // Just append the block to .last or .l file
if (lFile) {
// read the block from .last file and merge with pCols, write to .l file
} else {
// tsdbWriteBlockToFileImpl();
}
}
} else { // The block need to merge in .data file
}
}
return numOfPointsToWrite;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册