diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index cb39b990e53fe54ffe20a1136a8a12d2f2d86875..3a84ce8f6f3ad369e9f6972d79d3356451c4ef25 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -251,7 +251,7 @@ void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); // ----------------- K-V data row structure /* diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 5cd3b79aa1d1d5b088473cd8cdc3f8a0ec801ec5..7880a4b3021d71872f194f7bd5512d03ee940363 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -450,7 +450,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int iter1 = 0; int iter2 = 0; - tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge); + tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows, + pTarget->numOfRows + rowsToMerge); } tdFreeDataCols(pTarget); @@ -461,15 +462,15 @@ _err: return -1; } -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) { - // TODO: add resolve duplicate key here +void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) { tdResetDataCols(target); + ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); while (target->numOfRows < tRows) { - if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break; + if (*iter1 >= limit1 && *iter2 >= limit2) break; - TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; - TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; + TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; + TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; if (key1 <= key2) { for (int i = 0; i < src1->numOfCols; i++) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index fd317e9d0cebf016652952d9eb370d5fbad95143..eab70b591349cacfc382c206e393a4846283a559 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -977,7 +977,8 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa // tdResetDataCols(pHelper->pDataCols[1]); while (true) { if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break; - tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5); + tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows, + pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5); ASSERT(pHelper->pDataCols[1]->numOfRows > 0); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) @@ -989,54 +990,6 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa } round++; blkIdx++; - // TODO: the blkIdx here is not correct - - // if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) { - // if (pHelper->pDataCols[1]->numOfRows > 0) { - // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], - // pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) - // goto _err; - // // TODO: the blkIdx here is not correct - // tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows); - // } - // } - - // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows - // ? INT64_MAX - // : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; - // TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; - - // if (key1 < key2) { - // for (int i = 0; i < pDataCols->numOfCols; i++) { - // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows), - // ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), - // TYPE_BYTES[pDataCol->type]); - // } - // pHelper->pDataCols[1]->numOfRows++; - // iter1++; - // } else if (key1 == key2) { - // // TODO: think about duplicate key cases - // ASSERT(false); - // } else { - // for (int i = 0; i < pDataCols->numOfCols; i++) { - // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; - // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows), - // ((char *)pDataCols->cols[i].pData + - // TYPE_BYTES[pDataCol->type] * iter2), - // TYPE_BYTES[pDataCol->type]); - // } - // pHelper->pDataCols[1]->numOfRows++; - // iter2++; - // } - - // if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { - // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err; - // // TODO: blkIdx here is not correct, fix it - // tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); - - // tdResetDataCols(pHelper->pDataCols[1]); - // } } } }