提交 ac209abb 编写于 作者: H hzcheng

TD-100

上级 b64daf73
......@@ -382,27 +382,28 @@ typedef struct {
// Global configuration
SHelperCfg config;
int8_t state;
// For file set usage
SHelperFile files;
SCompIdx * pCompIdx;
size_t compIdxSize;
// For table set usage
SHelperTable tableInfo;
SCompIdx compIdx; // SCompIdx of current table
int8_t state; // current loading state
// Information in .head file
SCompIdx *pCompIdx;
size_t compIdxSize;
SCompInfo * pCompInfo;
size_t compInfoSize;
bool hasOldLastBlock;
SCompInfo *pCompInfo;
size_t compInfoSize;
int blockIter; // For write purpose
// Information in .data or .last file
// For block set usage
SCompData *pCompData;
size_t compDataSize;
SDataCols *pDataCols[2];
// ------ Perhaps no usage
SCompIdx compIdx; // SCompIdx of current table
int blockIter; // For write purpose
// Compression buffer
void * cBuffer;
size_t cBufSize;
......@@ -434,8 +435,8 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
// --------- For read operations
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target);
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target);
int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target);
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds);
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target);
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
// --------- For write operations
......
......@@ -175,7 +175,12 @@ void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema
tdInitDataCols(pHelper->pDataCols[0], pSchema);
tdInitDataCols(pHelper->pDataCols[1], pSchema);
pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid];
SCompIdx *pIdx = pHelper->pCompIdx + pHelperTable->tid;
if (pIdx->offset > 0 && pIdx->hasLast) {
pHelper->hasOldLastBlock = true;
}
// pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid];
helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
}
......@@ -205,6 +210,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
}
if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block
ASSERT(pHelper->hasOldLastBlock == false);
rowsToWrite = pDataCols->numOfPoints;
SFile *pWFile = NULL;
bool isLast = false;
......@@ -231,9 +237,8 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
if (rowsToWrite < 0) goto _err;
} else { // Has key overlap
if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { // Key overlap with the block
// TSKEY keyLimit =
// (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : (pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1);
if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
// Key overlap with the block, must merge with the block
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
if (rowsToWrite < 0) goto _err;
......@@ -269,17 +274,61 @@ _err:
}
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
// TODO
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock compBlock;
if ((pHelper->files.nHeadF.fd > 0) && (pHelper->hasOldLastBlock)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfSuperBlocks - 1;
ASSERT(pCompBlock->last);
if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, pIdx->numOfSuperBlocks - 1, NULL) < 0) return -1;
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0)
return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfSuperBlocks - 1) < 0) return -1;
} else {
if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
if (pCompBlock->offset < 0) return -1;
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len)
return -1;
}
pHelper->hasOldLastBlock = false;
}
return 0;
}
int tsdbWriteCompInfo(SRWHelper *pHelper) {
// TODO
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (pIdx->offset > 0) {
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
if (pIdx->offset < 0) return -1;
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
}
} else {
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
if (pIdx->offset < 0) return -1;
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
}
return 0;
}
int tsdbWriteCompIdx(SRWHelper *pHelper) {
// TODO
if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, pHelper->compIdxSize) < pHelper->compIdxSize)
return -1;
return 0;
}
......@@ -326,13 +375,78 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
return 0;
}
int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target) {
// TODO
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
ASSERT(pCompBlock->numOfSubBlocks <= 1);
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
adjustMem(pHelper->pCompData, pHelper->compDataSize, tsize);
if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;
ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);
if (target) memcpy(target, pHelper->pCompData, tsize);
return 0;
}
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds) {
// TODO
static int comparColIdCompCol(const void *arg1, const void *arg2) {
return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId;
}
static int comparColIdDataCol(const void *arg1, const void *arg2) {
return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId;
}
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset) < 0) return -1;
if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
return 0;
}
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
SDataCols *pDataCols) {
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
void *ptr = NULL;
for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i];
ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol), comparColIdCompCol);
if (ptr == NULL) continue;
SCompCol *pCompCol = (SCompCol *)ptr;
ptr = bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol);
ASSERT(ptr != NULL);
SDataCol *pDataCol = (SDataCol *)ptr;
pDataCol->len = pCompCol->len;
if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1;
}
return 0;
}
// Load specific column data from file
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) {
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
if (pCompBlock->numOfSubBlocks == 1) {
}
return 0;
}
......@@ -537,44 +651,74 @@ static int nRowsLEThan(SDataCols *pDataCols, int maxKey) {
return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1;
}
// Merge the data with a block
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
// TODO: set pHelper->hasOldBlock
int rowsWritten = 0;
SCompBlock compBlock = {0};
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx < pIdx->numOfSuperBlocks);
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pCompBlock->numOfSubBlocks >= 1);
ASSERT(keyFirst >= pCompBlock->keyFirst);
int rowsCanMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols);
if (rowsCanMerge < 0) goto _err;
// Start here
TSKEY keyLimit =
(blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyLast - 1;
ASSERT(rowsCanMerge > 0);
int rowsMustMerge = tsdbGetRowsInRange(pDataCols, 0, pCompBlock->keyLast);
int maxRowsCanMerge =
MIN(pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints, tsdbGetRowsInRange(pDataCols, keyLimit));
if (pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock) {
// Need to load the block and split as two super block
} else {
}
if (rowsMustMerge + pCompBlock->numOfPoints > pHelper->config.maxRowsPerFileBlock) {
// Load the block and merge as two super block
}
if (rowsMustMerge > maxRowsCanMerge) {
ASSERT(pCompBlock->numOfPoints + rowsMustMerge > pHelper->config.maxRowsPerFileBlock);
} else {
}
int rowsToMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols);
if (rowsToMerge < 0) goto _err;
ASSERT(rowsToMerge > 0);
if (pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS &&
((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0 &&
pCompBlock->numOfPoints + rowsCanMerge < pHelper->config.minRowsPerFileBlock))) {
pCompBlock->numOfPoints + rowsToMerge < pHelper->config.minRowsPerFileBlock))) {
SFile *pFile = NULL;
if (!pCompBlock->last) {
if ((!pCompBlock->last) || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) {
pFile = &(pHelper->files.dataF);
} else {
pFile = &(pHelper->files.lastF);
}
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last, false) < 0) goto _err;
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsToMerge, &compBlock, pCompBlock->last, false) < 0) goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsCanMerge) < 0) goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsToMerge) < 0) goto _err;
} else {
// Read-Merge-Write as a super block
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsCanMerge);
tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsToMerge);
int isLast = 0;
SFile *pFile = NULL;
if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.minRowsPerFileBlock)) {
if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsToMerge >= pHelper->config.minRowsPerFileBlock)) {
pFile = &(pHelper->files.dataF);
} else {
isLast = 1;
......@@ -585,7 +729,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
}
}
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast, true) < 0) goto _err;
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsToMerge, &compBlock, isLast, true) < 0) goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
}
......@@ -619,7 +763,7 @@ static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SData
} else {
int32_t colId[1] = {0};
if (tsdbLoadBlockDataCols(pHelper, NULL, colId, 1) < 0) goto _err;
if (tsdbLoadBlockDataCols(pHelper, NULL, blkIdx, colId, 1) < 0) goto _err;
int iter1 = 0; // For pDataCols
int iter2 = 0; // For loaded data cols
......@@ -808,4 +952,28 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last;
return 0;
}
// Get the number of rows in range [minKey, maxKey]
static int tsdbGetRowsInRange(SDataCols *pDataCols, int minKey, int maxKey) {
if (pDataCols->numOfPoints == 0) return 0;
ASSERT(minKey <= maxKey);
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
TSKEY keyLast = dataColsKeyLast(pDataCols);
ASSERT(keyFirst <= keyLast);
if (minKey > keyLast || maxKey < keyFirst) return 0;
void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
compTSKEY, TD_GE);
ASSERT(ptr1 != NULL);
void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
compTSKEY, TD_LE);
ASSERT(ptr2 != NULL);
if ((TSKEY *)ptr2 - (TSKEY *)ptr1 < 0) return 0;
return (TSKEY *)ptr2 - (TSKEY *)ptr1;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册