From 5023ce7d36e676270f8b871a389ba4b46b3d9d21 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 16 Sep 2020 15:56:40 +0800 Subject: [PATCH] TD-1438 --- src/common/inc/tdataformat.h | 2 -- src/common/src/tdataformat.c | 11 ++++++-- src/inc/tsdb.h | 1 + src/tsdb/inc/tsdbMain.h | 2 +- src/tsdb/src/tsdbMain.c | 5 ++++ src/tsdb/src/tsdbMemTable.c | 14 ++++++---- src/tsdb/src/tsdbRWHelper.c | 53 +++++++++++++++++++----------------- 7 files changed, 52 insertions(+), 36 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index cc4afeb3f8..aa39d5b0b2 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -255,8 +255,6 @@ void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows); // ----------------- K-V data row structure /* diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index e7f40442a0..0c446b6d4f 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -18,6 +18,9 @@ #include "tcoding.h" #include "wchar.h" +static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, + int limit2, int tRows); + /** * Duplicate the schema and return a new object */ @@ -499,7 +502,9 @@ _err: return -1; } -void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) { +// src2 data has more priority than src1 +static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, + int limit2, int tRows) { tdResetDataCols(target); ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); @@ -509,7 +514,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; - if (key1 <= key2) { + if (key1 < key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); if (src1->cols[i].len > 0) { @@ -520,7 +525,6 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi target->numOfRows++; (*iter1)++; - if (key1 == key2) (*iter2)++; } else { for (int i = 0; i < src2->numOfCols; i++) { ASSERT(target->cols[i].type == src2->cols[i].type); @@ -532,6 +536,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi target->numOfRows++; (*iter2)++; + if (key1 == key2) (*iter1)++; } } } diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 85f9b3bdc7..fe03277763 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -65,6 +65,7 @@ typedef struct { int32_t maxRowsPerFileBlock; // maximum rows per file block int8_t precision; int8_t compression; + int8_t update; } STsdbCfg; // --------- TSDB REPOSITORY USAGE STATISTICS diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 256b8189f8..95b7010038 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -430,7 +430,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem) void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, - TSKEY* filterKeys, int nFilterKeys); + TSKEY* filterKeys, int nFilterKeys, bool keepDup); static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a1e6376304..494f17807e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { } } + // update check + if (pCfg->update != 0) pCfg->update = 1; + return 0; _err: @@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) { tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock); tlen += taosEncodeFixedI8(buf, pCfg->precision); tlen += taosEncodeFixedI8(buf, pCfg->compression); + tlen += taosEncodeFixedI8(buf, pCfg->update); return tlen; } @@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock)); buf = taosDecodeFixedI8(buf, &(pCfg->precision)); buf = taosDecodeFixedI8(buf, &(pCfg->compression)); + buf = taosDecodeFixedI8(buf, &(pCfg->update)); return buf; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 0b7cc4dca4..c58460e18f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -278,7 +278,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { } int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, - TSKEY *filterKeys, int nFilterKeys) { + TSKEY *filterKeys, int nFilterKeys, bool keepDup) { ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); if (pIter == NULL) return 0; STSchema *pSchema = NULL; @@ -319,6 +319,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey if (!keyFiltered) { if (numOfRows >= maxRowsToRead) break; + numOfRows++; + } + + if (!keyFiltered || keepDup) { if (pCols) { if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); @@ -329,8 +333,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey tdAppendDataRowToDataCol(row, pSchema, pCols); } - numOfRows++; - } + } } while (tSkipListIterNext(pIter)); return numOfRows; @@ -422,8 +425,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->keyLast = 0; pTableData->numOfRows = 0; - pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, - TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], /*SL_DISCARD_DUP_KEY*/ SL_APPEND_DUP_KEY, tsdbGetTsTupleKey); + pTableData->pData = + tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], + pCfg->update ? SL_APPEND_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index bb0f00ef53..f0017d0a14 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -62,7 +62,7 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, int *blkIdx); static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows); + TSKEY maxKey, int maxRows, int8_t update); // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { @@ -1453,7 +1453,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); tdResetDataCols(pDataCols); int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, - pDataCols, NULL, 0); + pDataCols, NULL, 0, pCfg->update); ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { @@ -1474,7 +1474,8 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, } else { ASSERT(!pHelper->hasOldLastBlock); tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); + int rowsRead = + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update); ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; @@ -1516,17 +1517,17 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; - int rows2 = - tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); - if (rows2 == 0) { // all data filtered out + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, + pDataCols0->numOfRows, pCfg->update); + if (!pCfg->update && rows2 == 0) { // all data filtered out *(pCommitIter->pIter) = slIter; } else { if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { tdResetDataCols(pDataCols); int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, - pDataCols0->cols[0].pData, pDataCols0->numOfRows); - ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); + pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update); + ASSERT(rowsRead == rows2 && rowsRead <= pDataCols->numOfRows && pDataCols->numOfRows > 0); if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; tblkIdx++; @@ -1535,9 +1536,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int round = 0; int dIter = 0; while (true) { - tdResetDataCols(pDataCols); - int rowsRead = - tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); + int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, + defaultRowsInBlock, pCfg->update); if (rowsRead == 0) break; if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; @@ -1561,8 +1561,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (keyFirst < blkKeyFirst) { while (true) { tdResetDataCols(pDataCols); - int rowsRead = - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, + NULL, 0, pCfg->update); if (rowsRead == 0) break; ASSERT(rowsRead == pDataCols->numOfRows); @@ -1580,21 +1580,21 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, slIter = *(pCommitIter->pIter); int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, - pDataCols0->numOfRows); + pDataCols0->numOfRows, pCfg->update); - if (rows2 == 0) { // all filtered out + if (!pCfg->update && rows2 == 0) { // all filtered out *(pCommitIter->pIter) = slIter; ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); } else { - int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; + int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0, pCfg->update) + rows2; if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { int rows = (rows1 >= rows3) ? rows3 : rows2; tdResetDataCols(pDataCols); int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, - pDataCols0->cols[0].pData, pDataCols0->numOfRows); - ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); + pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update); + ASSERT(rowsRead == rows && rowsRead <= pDataCols->numOfRows); if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; @@ -1606,12 +1606,11 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int round = 0; int dIter = 0; while (true) { - int rowsRead = - tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); + int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, + defaultRowsInBlock, pCfg->update); if (rowsRead == 0) break; - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) - return -1; + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } else { @@ -1633,7 +1632,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, } static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows) { + TSKEY maxKey, int maxRows, int8_t update) { int numOfRows = 0; TSKEY key1 = INT64_MAX; TSKEY key2 = INT64_MAX; @@ -1649,14 +1648,17 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte if (key1 == INT64_MAX && key2 == INT64_MAX) break; - if (key1 <= key2) { + if ((key1 < key2) || ((!update) && (key1 == key2))) { for (int i = 0; i < pDataCols->numOfCols; i++) { dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, pTarget->maxPoints); } pTarget->numOfRows++; (*iter)++; - if (key1 == key2) tSkipListIterNext(pCommitIter->pIter); + + if ((!update) && (key1 == key2)) { + tSkipListIterNext(pCommitIter->pIter); + } } else { if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); @@ -1665,6 +1667,7 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte tdAppendDataRowToDataCol(row, pSchema, pTarget); tSkipListIterNext(pCommitIter->pIter); + if (key1 == key2) (*iter)++; } numOfRows++; -- GitLab