提交 db37e3cd 编写于 作者: H Hongze Cheng

TD-1438

上级 09e91f9a
......@@ -255,8 +255,6 @@ void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
// ----------------- K-V data row structure
/*
......
......@@ -18,6 +18,9 @@
#include "tcoding.h"
#include "wchar.h"
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows);
/**
* Duplicate the schema and return a new object
*/
......@@ -499,7 +502,9 @@ _err:
return -1;
}
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
// src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows) {
tdResetDataCols(target);
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
......@@ -509,7 +514,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
if (key1 <= key2) {
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0) {
......@@ -520,7 +525,6 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
target->numOfRows++;
(*iter1)++;
if (key1 == key2) (*iter2)++;
} else {
for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type);
......@@ -532,6 +536,7 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
target->numOfRows++;
(*iter2)++;
if (key1 == key2) (*iter1)++;
}
}
}
......
......@@ -65,6 +65,7 @@ typedef struct {
int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision;
int8_t compression;
int8_t update;
} STsdbCfg;
// --------- TSDB REPOSITORY USAGE STATISTICS
......
......@@ -430,7 +430,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem)
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TSKEY* filterKeys, int nFilterKeys);
TSKEY* filterKeys, int nFilterKeys, bool keepDup);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL;
......
......@@ -512,6 +512,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
}
}
// update check
if (pCfg->update != 0) pCfg->update = 1;
return 0;
_err:
......@@ -923,6 +926,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock);
tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression);
tlen += taosEncodeFixedI8(buf, pCfg->update);
return tlen;
}
......@@ -939,6 +943,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock));
buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression));
buf = taosDecodeFixedI8(buf, &(pCfg->update));
return buf;
}
......
......@@ -278,7 +278,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
}
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TSKEY *filterKeys, int nFilterKeys) {
TSKEY *filterKeys, int nFilterKeys, bool keepDup) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
if (pIter == NULL) return 0;
STSchema *pSchema = NULL;
......@@ -319,6 +319,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if (!keyFiltered) {
if (numOfRows >= maxRowsToRead) break;
numOfRows++;
}
if (!keyFiltered || keepDup) {
if (pCols) {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
......@@ -329,8 +333,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tdAppendDataRowToDataCol(row, pSchema, pCols);
}
numOfRows++;
}
}
} while (tSkipListIterNext(pIter));
return numOfRows;
......@@ -422,8 +425,9 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->keyLast = 0;
pTableData->numOfRows = 0;
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], /*SL_DISCARD_DUP_KEY*/ SL_APPEND_DUP_KEY, tsdbGetTsTupleKey);
pTableData->pData =
tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
pCfg->update ? SL_APPEND_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......
......@@ -62,7 +62,7 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
int *blkIdx);
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows);
TSKEY maxKey, int maxRows, int8_t update);
// ---------------------- INTERNAL FUNCTIONS ----------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
......@@ -1453,7 +1453,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows,
pDataCols, NULL, 0);
pDataCols, NULL, 0, pCfg->update);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
......@@ -1474,7 +1474,8 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
} else {
ASSERT(!pHelper->hasOldLastBlock);
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
......@@ -1516,17 +1517,17 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
int rows2 =
tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
if (rows2 == 0) { // all data filtered out
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows, pCfg->update);
if (!pCfg->update && rows2 == 0) { // all data filtered out
*(pCommitIter->pIter) = slIter;
} else {
if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update);
ASSERT(rowsRead == rows2 && rowsRead <= pDataCols->numOfRows && pDataCols->numOfRows > 0);
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++;
......@@ -1535,9 +1536,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int round = 0;
int dIter = 0;
while (true) {
tdResetDataCols(pDataCols);
int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey,
defaultRowsInBlock, pCfg->update);
if (rowsRead == 0) break;
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
......@@ -1561,8 +1561,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (keyFirst < blkKeyFirst) {
while (true) {
tdResetDataCols(pDataCols);
int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols,
NULL, 0, pCfg->update);
if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows);
......@@ -1580,21 +1580,21 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows);
pDataCols0->numOfRows, pCfg->update);
if (rows2 == 0) { // all filtered out
if (!pCfg->update && rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0, pCfg->update) + rows2;
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
pDataCols0->cols[0].pData, pDataCols0->numOfRows, pCfg->update);
ASSERT(rowsRead == rows && rowsRead <= pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0)
return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
......@@ -1606,12 +1606,11 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int round = 0;
int dIter = 0;
while (true) {
int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit,
defaultRowsInBlock, pCfg->update);
if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0)
return -1;
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else {
......@@ -1633,7 +1632,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
}
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows) {
TSKEY maxKey, int maxRows, int8_t update) {
int numOfRows = 0;
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
......@@ -1649,14 +1648,17 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
if (key1 <= key2) {
if ((key1 < key2) || ((!update) && (key1 == key2))) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
pTarget->numOfRows++;
(*iter)++;
if (key1 == key2) tSkipListIterNext(pCommitIter->pIter);
if ((!update) && (key1 == key2)) {
tSkipListIterNext(pCommitIter->pIter);
}
} else {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
......@@ -1665,6 +1667,7 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte
tdAppendDataRowToDataCol(row, pSchema, pTarget);
tSkipListIterNext(pCommitIter->pIter);
if (key1 == key2) (*iter)++;
}
numOfRows++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册