提交 709f6d09 编写于 作者: H hzcheng

TD-100

上级 5380a36a
...@@ -137,6 +137,7 @@ void tdFreeDataCols(SDataCols *pCols); ...@@ -137,6 +137,7 @@ void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -406,52 +406,90 @@ static int tdFLenFromSchema(STSchema *pSchema) { ...@@ -406,52 +406,90 @@ static int tdFLenFromSchema(STSchema *pSchema) {
} }
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
// TODO
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints);
SDataCols *pTarget = tdDupDataCols(target, true); SDataCols *pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err; if (pTarget == NULL) goto _err;
tdResetDataCols(target); // tdResetDataCols(target);
int iter1 = 0; int iter1 = 0;
int iter2 = 0; int iter2 = 0;
while (true) { tdMergeTwoDataCols(target,pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge);
if (iter1 >= pTarget->numOfPoints && iter2 >= source->numOfPoints) break; // while (true) {
// if (iter1 >= pTarget->numOfPoints && iter2 >= source->numOfPoints) break;
// TSKEY key1 = (iter1 >= pTarget->numOfPoints) ? INT64_MAX : ((TSKEY *)(pTarget->cols[0].pData))[iter1];
// TSKEY key2 = (iter2 >= rowsToMerge) ? INT64_MAX : ((TSKEY *)(source->cols[0].pData))[iter2];
// if (key1 < key2) { // Copy from pTarget
// for (int i = 0; i < pTarget->numOfCols; i++) {
// ASSERT(target->cols[i].type == pTarget->cols[i].type);
// memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
// (void *)((char *)(pTarget->cols[i].pData) + TYPE_BYTES[pTarget->cols[i].type] * iter1),
// TYPE_BYTES[target->cols[i].type]);
// target->cols[i].len += TYPE_BYTES[target->cols[i].type];
// }
// target->numOfPoints++;
// iter1++;
// } else if (key1 > key2) { // Copy from source
// for (int i = 0; i < source->numOfCols; i++) {
// ASSERT(target->cols[i].type == pTarget->cols[i].type);
// memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
// (void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2),
// TYPE_BYTES[target->cols[i].type]);
// target->cols[i].len += TYPE_BYTES[target->cols[i].type];
// }
// target->numOfPoints++;
// iter2++;
// } else {
// // TODO
// ASSERT(false);
// }
// }
tdFreeDataCols(pTarget);
return 0;
_err:
tdFreeDataCols(pTarget);
return -1;
}
TSKEY key1 = (iter1 >= pTarget->numOfPoints) ? INT64_MAX : ((TSKEY *)(pTarget->cols[0].pData))[iter1]; void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
TSKEY key2 = (iter2 >= rowsToMerge) ? INT64_MAX : ((TSKEY *)(source->cols[0].pData))[iter2]; tdResetDataCols(target);
while (target->numOfPoints < tRows) {
if (*iter1 >= src1->numOfPoints && *iter2 >= src2->numOfPoints) break;
TSKEY key1 = (*iter1 >= src1->numOfPoints) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
TSKEY key2 = (*iter2 >= src2->numOfPoints) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
if (key1 < key2) { // Copy from pTarget if (key1 < key2) {
for (int i = 0; i < pTarget->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == pTarget->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
(void *)((char *)(pTarget->cols[i].pData) + TYPE_BYTES[pTarget->cols[i].type] * iter1), (void *)((char *)(src1->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * (*iter1)),
TYPE_BYTES[target->cols[i].type]); TYPE_BYTES[target->cols[i].type]);
target->cols[i].len += TYPE_BYTES[target->cols[i].type]; target->cols[i].len += TYPE_BYTES[target->cols[i].type];
} }
target->numOfPoints++; target->numOfPoints++;
iter1++; *iter1++;
} else if (key1 > key2) { // Copy from source } else if (key1 > key2) {
for (int i = 0; i < source->numOfCols; i++) { for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == pTarget->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
(void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2), (void *)((char *)(src2->cols[i].pData) + TYPE_BYTES[src2->cols[i].type] * (*iter2)),
TYPE_BYTES[target->cols[i].type]); TYPE_BYTES[target->cols[i].type]);
target->cols[i].len += TYPE_BYTES[target->cols[i].type]; target->cols[i].len += TYPE_BYTES[target->cols[i].type];
} }
target->numOfPoints++; target->numOfPoints++;
iter2++; *iter2++;
} else { } else {
assert(false); ASSERT(false);
} }
} }
tdFreeDataCols(pTarget);
return 0;
_err:
tdFreeDataCols(pTarget);
return -1;
} }
\ No newline at end of file
...@@ -440,6 +440,7 @@ typedef struct { ...@@ -440,6 +440,7 @@ typedef struct {
#define helperSetState(h, s) (((h)->state) |= (s)) #define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s))) #define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) #define helperHasState(h, s) ((((h)->state) & (s)) == (s))
#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg); int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg);
void tsdbDestroyHelper(SRWHelper *pHelper); void tsdbDestroyHelper(SRWHelper *pHelper);
......
...@@ -832,7 +832,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { ...@@ -832,7 +832,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) {
// Commit to file // Commit to file
static void *tsdbCommitData(void *arg) { static void *tsdbCommitData(void *arg) {
// TODO
printf("Starting to commit....\n"); printf("Starting to commit....\n");
STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
...@@ -849,7 +848,7 @@ static void *tsdbCommitData(void *arg) { ...@@ -849,7 +848,7 @@ static void *tsdbCommitData(void *arg) {
return NULL; return NULL;
} }
// Create a write helper for commit data // Create a write helper to commit data
SHelperCfg hcfg = {.type = TSDB_WRITE_HELPER, SHelperCfg hcfg = {.type = TSDB_WRITE_HELPER,
.maxTables = pCfg->maxTables, .maxTables = pCfg->maxTables,
.maxRowSize = pMeta->maxRowBytes, .maxRowSize = pMeta->maxRowBytes,
...@@ -883,7 +882,6 @@ _exit: ...@@ -883,7 +882,6 @@ _exit:
free(pCache->imem); free(pCache->imem);
pCache->imem = NULL; pCache->imem = NULL;
pRepo->commit = 0; pRepo->commit = 0;
// TODO: free the skiplist
for (int i = 0; i < pCfg->maxTables; i++) { for (int i = 0; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) { if (pTable && pTable->imem) {
......
...@@ -333,6 +333,7 @@ _err: ...@@ -333,6 +333,7 @@ _err:
} }
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock compBlock; SCompBlock compBlock;
if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
...@@ -343,6 +344,8 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -343,6 +344,8 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
if (pCompBlock->numOfSubBlocks > 1) { if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, pIdx->numOfSuperBlocks - 1, NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, pIdx->numOfSuperBlocks - 1, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 &&
pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0) pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0)
return -1; return -1;
...@@ -369,18 +372,19 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -369,18 +372,19 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (pIdx->offset > 0) { if (pIdx->offset > 0) {
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE);
if (pIdx->offset < 0) return -1; if (pIdx->offset < 0) return -1;
ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx));
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1;
} }
} else { } else {
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pHelper->pCompInfo->uid = pHelper->tableInfo.uid; pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE);
if (pIdx->offset < 0) return -1; if (pIdx->offset < 0) return -1;
ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx));
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
} }
...@@ -389,8 +393,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -389,8 +393,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
} }
int tsdbWriteCompIdx(SRWHelper *pHelper) { int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
ASSERT(tsizeof(pHelper->pCompIdx) == sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM));
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx))
...@@ -755,62 +761,68 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -755,62 +761,68 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx < pIdx->numOfSuperBlocks); ASSERT(blkIdx < pIdx->numOfSuperBlocks);
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pCompBlock->numOfSubBlocks >= 1); ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
ASSERT(keyFirst >= pCompBlock->keyFirst); ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst);
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
if (keyFirst > pCompBlock->keyLast) { // Merge the last block by append if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
ASSERT(pCompBlock->last && pCompBlock->numOfPoints < pHelper->config.minRowsPerFileBlock); ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfSuperBlocks-1);
int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
rowsWritten = MIN((defaultRowsToWrite - pCompBlock->numOfPoints), pDataCols->numOfPoints); rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints);
if (rowsWritten + pCompBlock->numOfPoints >= pHelper->config.minRowsPerFileBlock) { if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
// Need to write to .data file (blockAtIdx(pHelper, blkIdx)->numOfPoints + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[0],
rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0)
goto _err; goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else { } else {
// Need still write the .last or .l file // Load
if (pHelper->files.nLastF.fd > 0) { if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints);
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; // Merge
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, pHelper->pDataCols[0], if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0) // Write
goto _err; SFile *pWFile = NULL;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; bool isLast = false;
if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.minRowsPerFileBlock) {
pWFile = &(pHelper->files.dataF);
} else { } else {
// Write to .last file and append as a sub-block isLast = true;
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.lastF, pDataCols, rowsWritten, &compBlock, true, false) < 0) pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} }
if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfPoints, &compBlock, isLast, true) < 0)
goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
} }
ASSERT(pHelper->hasOldLastBlock);
pHelper->hasOldLastBlock = false;
} else { } else {
// TODO: key overlap, must merge with the block // Key must overlap with the block
ASSERT(keyFirst <= pCompBlock->keyLast); ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
TSKEY keyLimit = TSKEY keyLimit =
(blkIdx == pIdx->numOfSuperBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1; (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
int rows1 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst, // rows1: number of rows must merge in this block
pCompBlock->keyLast); // number of rows must merge in this block int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
int rows2 = // rows2: max nuber of rows the block can have more
pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints; // max nuber of rows the block can have more int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfPoints;
int rows3 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst, // rows3: number of rows between this block and the next block
keyLimit); // number of rows between this block and the next block int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
ASSERT(rows3 >= rows1); ASSERT(rows3 >= rows1);
if ((rows2 >= rows1) && ((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0))) { if ((rows2 >= rows1) &&
(( blockAtIdx(pHelper, blkIdx)->last) ||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
rowsWritten = rows1; rowsWritten = rows1;
bool isLast = false; bool isLast = false;
SFile *pFile = NULL; SFile *pFile = NULL;
if (pCompBlock->last) { if (blockAtIdx(pHelper, blkIdx)->last) {
isLast = true; isLast = true;
pFile = &(pHelper->files.lastF); pFile = &(pHelper->files.lastF);
} else { } else {
...@@ -819,63 +831,79 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -819,63 +831,79 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err; if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else { } else { // Load-Merge-Write
// Need to read the data block and merge with pCompDataCol to write as super block // Load
// Read
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
rowsWritten = rows3; rowsWritten = rows3;
int iter1 = 0; // iter over pHelper->pDataCols[0] int iter1 = 0; // iter over pHelper->pDataCols[0]
int iter2 = 0; // iter over pDataCols int iter2 = 0; // iter over pDataCols
tdResetDataCols(pHelper->pDataCols[1]); int round = 0;
// tdResetDataCols(pHelper->pDataCols[1]);
while (true) { while (true) {
if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) { if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) break;
if (pHelper->pDataCols[1]->numOfPoints > 0) { tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5);
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], ASSERT(pHelper->pDataCols[1]->numOfPoints > 0);
pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
goto _err; pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0)
// TODO: the blkIdx here is not correct goto _err;
tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints); if (round == 0) {
} tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
}
TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints
? INT64_MAX
: ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
if (key1 < key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
TYPE_BYTES[pDataCol->type]);
}
pHelper->pDataCols[1]->numOfPoints++;
iter1++;
} else if (key1 == key2) {
// TODO: think about duplicate key cases
ASSERT(false);
} else { } else {
for (int i = 0; i < pDataCols->numOfCols; i++) {
SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
((char *)pDataCols->cols[i].pData +
TYPE_BYTES[pDataCol->type] * iter2),
TYPE_BYTES[pDataCol->type]);
}
pHelper->pDataCols[1]->numOfPoints++;
iter2++;
}
if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err;
// TODO: blkIdx here is not correct, fix it
tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
tdResetDataCols(pHelper->pDataCols[1]);
} }
round++;
blkIdx++;
// TODO: the blkIdx here is not correct
// if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) {
// if (pHelper->pDataCols[1]->numOfPoints > 0) {
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
// pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0)
// goto _err;
// // TODO: the blkIdx here is not correct
// tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints);
// }
// }
// TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints
// ? INT64_MAX
// : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
// TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
// if (key1 < key2) {
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
// ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
// TYPE_BYTES[pDataCol->type]);
// }
// pHelper->pDataCols[1]->numOfPoints++;
// iter1++;
// } else if (key1 == key2) {
// // TODO: think about duplicate key cases
// ASSERT(false);
// } else {
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints),
// ((char *)pDataCols->cols[i].pData +
// TYPE_BYTES[pDataCol->type] * iter2),
// TYPE_BYTES[pDataCol->type]);
// }
// pHelper->pDataCols[1]->numOfPoints++;
// iter2++;
// }
// if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err;
// // TODO: blkIdx here is not correct, fix it
// tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
// tdResetDataCols(pHelper->pDataCols[1]);
// }
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册