From 709f6d094e52a6cf58fbbb20ca786d6b3cece774 Mon Sep 17 00:00:00 2001 From: hzcheng Date: Sat, 18 Apr 2020 00:46:33 +0800 Subject: [PATCH] TD-100 --- src/common/inc/dataformat.h | 1 + src/common/src/dataformat.c | 86 +++++++++++----- src/tsdb/inc/tsdbMain.h | 1 + src/tsdb/src/tsdbMain.c | 4 +- src/tsdb/src/tsdbRWHelper.c | 198 ++++++++++++++++++++---------------- 5 files changed, 178 insertions(+), 112 deletions(-) diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 783e378eb7..17aa19cce7 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -137,6 +137,7 @@ void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index b5451ae691..15dec2c71c 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -406,52 +406,90 @@ static int tdFLenFromSchema(STSchema *pSchema) { } int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { - // TODO ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); SDataCols *pTarget = tdDupDataCols(target, true); if (pTarget == NULL) goto _err; - tdResetDataCols(target); + // tdResetDataCols(target); int iter1 = 0; int iter2 = 0; - while (true) { - if (iter1 >= pTarget->numOfPoints && iter2 >= source->numOfPoints) break; + tdMergeTwoDataCols(target,pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); + // while (true) { + // if (iter1 >= pTarget->numOfPoints && iter2 >= source->numOfPoints) break; + + // TSKEY key1 = (iter1 >= pTarget->numOfPoints) ? INT64_MAX : ((TSKEY *)(pTarget->cols[0].pData))[iter1]; + // TSKEY key2 = (iter2 >= rowsToMerge) ? INT64_MAX : ((TSKEY *)(source->cols[0].pData))[iter2]; + + // if (key1 < key2) { // Copy from pTarget + // for (int i = 0; i < pTarget->numOfCols; i++) { + // ASSERT(target->cols[i].type == pTarget->cols[i].type); + // memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), + // (void *)((char *)(pTarget->cols[i].pData) + TYPE_BYTES[pTarget->cols[i].type] * iter1), + // TYPE_BYTES[target->cols[i].type]); + // target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + // } + + // target->numOfPoints++; + // iter1++; + // } else if (key1 > key2) { // Copy from source + // for (int i = 0; i < source->numOfCols; i++) { + // ASSERT(target->cols[i].type == pTarget->cols[i].type); + // memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), + // (void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2), + // TYPE_BYTES[target->cols[i].type]); + // target->cols[i].len += TYPE_BYTES[target->cols[i].type]; + // } + + // target->numOfPoints++; + // iter2++; + // } else { + // // TODO + // ASSERT(false); + // } + // } + + tdFreeDataCols(pTarget); + return 0; + +_err: + tdFreeDataCols(pTarget); + return -1; +} - TSKEY key1 = (iter1 >= pTarget->numOfPoints) ? INT64_MAX : ((TSKEY *)(pTarget->cols[0].pData))[iter1]; - TSKEY key2 = (iter2 >= rowsToMerge) ? INT64_MAX : ((TSKEY *)(source->cols[0].pData))[iter2]; +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) { + tdResetDataCols(target); + + while (target->numOfPoints < tRows) { + if (*iter1 >= src1->numOfPoints && *iter2 >= src2->numOfPoints) break; + + TSKEY key1 = (*iter1 >= src1->numOfPoints) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; + TSKEY key2 = (*iter2 >= src2->numOfPoints) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; - if (key1 < key2) { // Copy from pTarget - for (int i = 0; i < pTarget->numOfCols; i++) { - ASSERT(target->cols[i].type == pTarget->cols[i].type); + if (key1 < key2) { + for (int i = 0; i < src1->numOfCols; i++) { + ASSERT(target->cols[i].type == src1->cols[i].type); memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), - (void *)((char *)(pTarget->cols[i].pData) + TYPE_BYTES[pTarget->cols[i].type] * iter1), + (void *)((char *)(src1->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * (*iter1)), TYPE_BYTES[target->cols[i].type]); target->cols[i].len += TYPE_BYTES[target->cols[i].type]; } target->numOfPoints++; - iter1++; - } else if (key1 > key2) { // Copy from source - for (int i = 0; i < source->numOfCols; i++) { - ASSERT(target->cols[i].type == pTarget->cols[i].type); + *iter1++; + } else if (key1 > key2) { + for (int i = 0; i < src2->numOfCols; i++) { + ASSERT(target->cols[i].type == src2->cols[i].type); memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints), - (void *)((char *)(source->cols[i].pData) + TYPE_BYTES[source->cols[i].type] * iter2), + (void *)((char *)(src2->cols[i].pData) + TYPE_BYTES[src2->cols[i].type] * (*iter2)), TYPE_BYTES[target->cols[i].type]); target->cols[i].len += TYPE_BYTES[target->cols[i].type]; } target->numOfPoints++; - iter2++; + *iter2++; } else { - assert(false); + ASSERT(false); } } - - tdFreeDataCols(pTarget); - return 0; - -_err: - tdFreeDataCols(pTarget); - return -1; } \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 06313f0786..8af2026977 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -440,6 +440,7 @@ typedef struct { #define helperSetState(h, s) (((h)->state) |= (s)) #define helperClearState(h, s) ((h)->state &= (~(s))) #define helperHasState(h, s) ((((h)->state) & (s)) == (s)) +#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg); void tsdbDestroyHelper(SRWHelper *pHelper); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index fd01e8ce71..75a0c4d2a8 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -832,7 +832,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { // Commit to file static void *tsdbCommitData(void *arg) { - // TODO printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; @@ -849,7 +848,7 @@ static void *tsdbCommitData(void *arg) { return NULL; } - // Create a write helper for commit data + // Create a write helper to commit data SHelperCfg hcfg = {.type = TSDB_WRITE_HELPER, .maxTables = pCfg->maxTables, .maxRowSize = pMeta->maxRowBytes, @@ -883,7 +882,6 @@ _exit: free(pCache->imem); pCache->imem = NULL; pRepo->commit = 0; - // TODO: free the skiplist for (int i = 0; i < pCfg->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->imem) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index ac96cff366..a5b93a5688 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -333,6 +333,7 @@ _err: } int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { + ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompBlock compBlock; if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { @@ -343,6 +344,8 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { if (pCompBlock->numOfSubBlocks > 1) { if (tsdbLoadBlockData(pHelper, pIdx->numOfSuperBlocks - 1, NULL) < 0) return -1; + ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 && + pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0) return -1; @@ -369,18 +372,19 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (pIdx->offset > 0) { pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE); if (pIdx->offset < 0) return -1; + ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; } } else { pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->uid = pHelper->tableInfo.uid; + ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); - ASSERT(pIdx->offset > TSDB_FILE_HEAD_SIZE); if (pIdx->offset < 0) return -1; + ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; } @@ -389,8 +393,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { } int tsdbWriteCompIdx(SRWHelper *pHelper) { + ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + ASSERT(tsizeof(pHelper->pCompIdx) == sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM)); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)); if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) @@ -755,62 +761,68 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; ASSERT(blkIdx < pIdx->numOfSuperBlocks); - SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; - ASSERT(pCompBlock->numOfSubBlocks >= 1); - ASSERT(keyFirst >= pCompBlock->keyFirst); + // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; + ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1); + ASSERT(keyFirst >= blockAtIdx(pHelper, blkIdx)->keyFirst); // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); - if (keyFirst > pCompBlock->keyLast) { // Merge the last block by append - ASSERT(pCompBlock->last && pCompBlock->numOfPoints < pHelper->config.minRowsPerFileBlock); + if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append + ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfSuperBlocks-1); int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface - rowsWritten = MIN((defaultRowsToWrite - pCompBlock->numOfPoints), pDataCols->numOfPoints); - if (rowsWritten + pCompBlock->numOfPoints >= pHelper->config.minRowsPerFileBlock) { - // Need to write to .data file - if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; - if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; - if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[0], - rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0) + rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints); + if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && + (blockAtIdx(pHelper, blkIdx)->numOfPoints + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) { + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0) goto _err; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; + if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { - // Need still write the .last or .l file - if (pHelper->files.nLastF.fd > 0) { - if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; - if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; - if (tsdbWriteBlockToFile(pHelper, &pHelper->files.nLastF, pHelper->pDataCols[0], - rowsWritten + pCompBlock->numOfPoints, &compBlock, false, true) < 0) - goto _err; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; + // Load + if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints); + // Merge + if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; + // Write + SFile *pWFile = NULL; + bool isLast = false; + if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.minRowsPerFileBlock) { + pWFile = &(pHelper->files.dataF); } else { - // Write to .last file and append as a sub-block - if (tsdbWriteBlockToFile(pHelper, &pHelper->files.lastF, pDataCols, rowsWritten, &compBlock, true, false) < 0) - goto _err; - if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; + isLast = true; + pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); } + if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], + pHelper->pDataCols[0]->numOfPoints, &compBlock, isLast, true) < 0) + goto _err; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; } + + ASSERT(pHelper->hasOldLastBlock); + pHelper->hasOldLastBlock = false; } else { - // TODO: key overlap, must merge with the block - ASSERT(keyFirst <= pCompBlock->keyLast); + // Key must overlap with the block + ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast); TSKEY keyLimit = (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1; - int rows1 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst, - pCompBlock->keyLast); // number of rows must merge in this block - int rows2 = - pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints; // max nuber of rows the block can have more - int rows3 = tsdbGetRowsInRange(pDataCols, pCompBlock->keyFirst, - keyLimit); // number of rows between this block and the next block + // rows1: number of rows must merge in this block + int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); + // rows2: max nuber of rows the block can have more + int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfPoints; + // rows3: number of rows between this block and the next block + int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit); ASSERT(rows3 >= rows1); - if ((rows2 >= rows1) && ((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0))) { + if ((rows2 >= rows1) && + (( blockAtIdx(pHelper, blkIdx)->last) || + ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) { rowsWritten = rows1; bool isLast = false; SFile *pFile = NULL; - if (pCompBlock->last) { + if (blockAtIdx(pHelper, blkIdx)->last) { isLast = true; pFile = &(pHelper->files.lastF); } else { @@ -819,63 +831,79 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rows1, &compBlock, isLast, false) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; - } else { - // Need to read the data block and merge with pCompDataCol to write as super block - - // Read + } else { // Load-Merge-Write + // Load if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; + rowsWritten = rows3; int iter1 = 0; // iter over pHelper->pDataCols[0] int iter2 = 0; // iter over pDataCols - tdResetDataCols(pHelper->pDataCols[1]); + int round = 0; + // tdResetDataCols(pHelper->pDataCols[1]); while (true) { - if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) { - if (pHelper->pDataCols[1]->numOfPoints > 0) { - if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], - pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) - goto _err; - // TODO: the blkIdx here is not correct - tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints); - } - } - - TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints - ? INT64_MAX - : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; - TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; - - if (key1 < key2) { - for (int i = 0; i < pDataCols->numOfCols; i++) { - SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), - ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), - TYPE_BYTES[pDataCol->type]); - } - pHelper->pDataCols[1]->numOfPoints++; - iter1++; - } else if (key1 == key2) { - // TODO: think about duplicate key cases - ASSERT(false); + if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) break; + tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5); + ASSERT(pHelper->pDataCols[1]->numOfPoints > 0); + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1], + pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) + goto _err; + if (round == 0) { + tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx); } else { - for (int i = 0; i < pDataCols->numOfCols; i++) { - SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), - ((char *)pDataCols->cols[i].pData + - TYPE_BYTES[pDataCol->type] * iter2), - TYPE_BYTES[pDataCol->type]); - } - pHelper->pDataCols[1]->numOfPoints++; - iter2++; - } - - if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { - if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err; - // TODO: blkIdx here is not correct, fix it tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); - - tdResetDataCols(pHelper->pDataCols[1]); } + round++; + blkIdx++; + // TODO: the blkIdx here is not correct + + // if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) { + // if (pHelper->pDataCols[1]->numOfPoints > 0) { + // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], + // pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) + // goto _err; + // // TODO: the blkIdx here is not correct + // tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints); + // } + // } + + // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints + // ? INT64_MAX + // : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; + // TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; + + // if (key1 < key2) { + // for (int i = 0; i < pDataCols->numOfCols; i++) { + // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; + // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + // ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), + // TYPE_BYTES[pDataCol->type]); + // } + // pHelper->pDataCols[1]->numOfPoints++; + // iter1++; + // } else if (key1 == key2) { + // // TODO: think about duplicate key cases + // ASSERT(false); + // } else { + // for (int i = 0; i < pDataCols->numOfCols; i++) { + // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; + // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), + // ((char *)pDataCols->cols[i].pData + + // TYPE_BYTES[pDataCol->type] * iter2), + // TYPE_BYTES[pDataCol->type]); + // } + // pHelper->pDataCols[1]->numOfPoints++; + // iter2++; + // } + + // if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { + // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err; + // // TODO: blkIdx here is not correct, fix it + // tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); + + // tdResetDataCols(pHelper->pDataCols[1]); + // } } } } -- GitLab