提交 5023ce7d 编写于 作者: H Hongze Cheng

TD-1438

上级 09e91f9a
...@@ -255,8 +255,6 @@ void tdFreeDataCols(SDataCols *pCols); ...@@ -255,8 +255,6 @@ void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
// ----------------- K-V data row structure // ----------------- K-V data row structure
/* /*
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
#include "tcoding.h" #include "tcoding.h"
#include "wchar.h" #include "wchar.h"
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
/** /**
* Duplicate the schema and return a new object * Duplicate the schema and return a new object
*/ */
...@@ -499,7 +502,9 @@ _err: ...@@ -499,7 +502,9 @@ _err:
return -1; return -1;
} }
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) { // src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows) {
tdResetDataCols(target); tdResetDataCols(target);
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
...@@ -509,7 +514,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi ...@@ -509,7 +514,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
if (key1 <= key2) { if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) { if (src1->cols[i].len > 0) {
...@@ -520,7 +525,6 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi ...@@ -520,7 +525,6 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
target->numOfRows++; target->numOfRows++;
(*iter1)++; (*iter1)++;
if (key1 == key2) (*iter2)++;
} else { } else {
for (int i = 0; i < src2->numOfCols; i++) { for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
...@@ -532,6 +536,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi ...@@ -532,6 +536,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
target->numOfRows++; target->numOfRows++;
(*iter2)++; (*iter2)++;
if (key1 == key2) (*iter1)++;
} }
} }
} }
......
...@@ -65,6 +65,7 @@ typedef struct { ...@@ -65,6 +65,7 @@ typedef struct {
int32_t maxRowsPerFileBlock; // maximum rows per file block int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t update;
} STsdbCfg; } STsdbCfg;
// --------- TSDB REPOSITORY USAGE STATISTICS // --------- TSDB REPOSITORY USAGE STATISTICS
......
...@@ -430,7 +430,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem) ...@@ -430,7 +430,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem)
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TSKEY* filterKeys, int nFilterKeys); TSKEY* filterKeys, int nFilterKeys, bool keepDup);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
......
...@@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
} }
} }
// update check
if (pCfg->update != 0) pCfg->update = 1;
return 0; return 0;
_err: _err:
...@@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) { ...@@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock); tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock);
tlen += taosEncodeFixedI8(buf, pCfg->precision); tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression); tlen += taosEncodeFixedI8(buf, pCfg->compression);
tlen += taosEncodeFixedI8(buf, pCfg->update);
return tlen; return tlen;
} }
...@@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { ...@@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock)); buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock));
buf = taosDecodeFixedI8(buf, &(pCfg->precision)); buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression)); buf = taosDecodeFixedI8(buf, &(pCfg->compression));
buf = taosDecodeFixedI8(buf, &(pCfg->update));
return buf; return buf;
} }
......
...@@ -278,7 +278,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -278,7 +278,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
} }
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TSKEY *filterKeys, int nFilterKeys) { TSKEY *filterKeys, int nFilterKeys, bool keepDup) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
if (pIter == NULL) return 0; if (pIter == NULL) return 0;
STSchema *pSchema = NULL; STSchema *pSchema = NULL;
...@@ -319,6 +319,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -319,6 +319,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if (!keyFiltered) { if (!keyFiltered) {
if (numOfRows >= maxRowsToRead) break; if (numOfRows >= maxRowsToRead) break;
numOfRows++;
}
if (!keyFiltered || keepDup) {
if (pCols) { if (pCols) {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
...@@ -329,8 +333,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -329,8 +333,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tdAppendDataRowToDataCol(row, pSchema, pCols); tdAppendDataRowToDataCol(row, pSchema, pCols);
} }
numOfRows++; }
}
} while (tSkipListIterNext(pIter)); } while (tSkipListIterNext(pIter));
return numOfRows; return numOfRows;
...@@ -422,8 +425,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { ...@@ -422,8 +425,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->keyLast = 0; pTableData->keyLast = 0;
pTableData->numOfRows = 0; pTableData->numOfRows = 0;
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, pTableData->pData =
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], /*SL_DISCARD_DUP_KEY*/ SL_APPEND_DUP_KEY, tsdbGetTsTupleKey); tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
pCfg->update ? SL_APPEND_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) { if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
......
...@@ -62,7 +62,7 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols ...@@ -62,7 +62,7 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
int *blkIdx); int *blkIdx);
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows); TSKEY maxKey, int maxRows, int8_t update);
// ---------------------- INTERNAL FUNCTIONS ---------------------- // ---------------------- INTERNAL FUNCTIONS ----------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
...@@ -1453,7 +1453,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1453,7 +1453,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows,
pDataCols, NULL, 0); pDataCols, NULL, 0, pCfg->update);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
...@@ -1474,7 +1474,8 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1474,7 +1474,8 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
} else { } else {
ASSERT(!pHelper->hasOldLastBlock); ASSERT(!pHelper->hasOldLastBlock);
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
...@@ -1516,17 +1517,17 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1516,17 +1517,17 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
int rows2 = int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData,
tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); pDataCols0->numOfRows, pCfg->update);
if (rows2 == 0) { // all data filtered out if (!pCfg->update && rows2 == 0) { // all data filtered out
*(pCommitIter->pIter) = slIter; *(pCommitIter->pIter) = slIter;
} else { } else {
if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update);
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == rows2 && rowsRead <= pDataCols->numOfRows && pDataCols->numOfRows > 0);
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++; tblkIdx++;
...@@ -1535,9 +1536,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1535,9 +1536,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int round = 0; int round = 0;
int dIter = 0; int dIter = 0;
while (true) { while (true) {
tdResetDataCols(pDataCols); int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey,
int rowsRead = defaultRowsInBlock, pCfg->update);
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
if (rowsRead == 0) break; if (rowsRead == 0) break;
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
...@@ -1561,8 +1561,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1561,8 +1561,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (keyFirst < blkKeyFirst) { if (keyFirst < blkKeyFirst) {
while (true) { while (true) {
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols,
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); NULL, 0, pCfg->update);
if (rowsRead == 0) break; if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == pDataCols->numOfRows);
...@@ -1580,21 +1580,21 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1580,21 +1580,21 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
slIter = *(pCommitIter->pIter); slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows); pDataCols0->numOfRows, pCfg->update);
if (rows2 == 0) { // all filtered out if (!pCfg->update && rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter; *(pCommitIter->pIter) = slIter;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { } else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0, pCfg->update) + rows2;
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2; int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == rows && rowsRead <= pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0)
return -1; return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
...@@ -1606,12 +1606,11 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1606,12 +1606,11 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int round = 0; int round = 0;
int dIter = 0; int dIter = 0;
while (true) { while (true) {
int rowsRead = int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit,
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); defaultRowsInBlock, pCfg->update);
if (rowsRead == 0) break; if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
return -1;
if (round == 0) { if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else { } else {
...@@ -1633,7 +1632,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1633,7 +1632,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
} }
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows) { TSKEY maxKey, int maxRows, int8_t update) {
int numOfRows = 0; int numOfRows = 0;
TSKEY key1 = INT64_MAX; TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX; TSKEY key2 = INT64_MAX;
...@@ -1649,14 +1648,17 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte ...@@ -1649,14 +1648,17 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte
if (key1 == INT64_MAX && key2 == INT64_MAX) break; if (key1 == INT64_MAX && key2 == INT64_MAX) break;
if (key1 <= key2) { if ((key1 < key2) || ((!update) && (key1 == key2))) {
for (int i = 0; i < pDataCols->numOfCols; i++) { for (int i = 0; i < pDataCols->numOfCols; i++) {
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints); pTarget->maxPoints);
} }
pTarget->numOfRows++; pTarget->numOfRows++;
(*iter)++; (*iter)++;
if (key1 == key2) tSkipListIterNext(pCommitIter->pIter);
if ((!update) && (key1 == key2)) {
tSkipListIterNext(pCommitIter->pIter);
}
} else { } else {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
...@@ -1665,6 +1667,7 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte ...@@ -1665,6 +1667,7 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte
tdAppendDataRowToDataCol(row, pSchema, pTarget); tdAppendDataRowToDataCol(row, pSchema, pTarget);
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
if (key1 == key2) (*iter)++;
} }
numOfRows++; numOfRows++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册