未验证 提交 b0f7c114 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2745 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -264,13 +264,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -264,13 +264,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
do { do {
if (numOfRows >= maxRowsToRead) break;
SDataRow row = tsdbNextIterRow(pIter); SDataRow row = tsdbNextIterRow(pIter);
if (row == NULL) break; if (row == NULL) break;
keyNext = dataRowKey(row); keyNext = dataRowKey(row);
if (keyNext < 0 || keyNext > maxKey) break; if (keyNext > maxKey) break;
bool keyFiltered = false; bool keyFiltered = false;
if (nFilterKeys != 0) { if (nFilterKeys != 0) {
...@@ -289,6 +287,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -289,6 +287,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
if (!keyFiltered) { if (!keyFiltered) {
if (numOfRows >= maxRowsToRead) break;
if (pCols) { if (pCols) {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
......
...@@ -1477,6 +1477,11 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1477,6 +1477,11 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
} }
#ifndef NDEBUG
TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter);
ASSERT(keyNext < 0 || keyNext > pIdx->maxKey);
#endif
return 0; return 0;
} }
...@@ -1561,11 +1566,12 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1561,11 +1566,12 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++; tblkIdx++;
} }
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { } else {
ASSERT(keyFirst <= blkKeyLast); ASSERT(keyFirst <= blkKeyLast);
int16_t colId = 0; int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
slIter = *(pCommitIter->pIter); slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
...@@ -1574,9 +1580,10 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1574,9 +1580,10 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (rows2 == 0) { // all filtered out if (rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter; *(pCommitIter->pIter) = slIter;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { } else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
ASSERT(rows3 >= rows2);
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2; int rows = (rows1 >= rows3) ? rows3 : rows2;
...@@ -1588,6 +1595,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1588,6 +1595,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
return -1; return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++; tblkIdx++;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0; int round = 0;
...@@ -1608,6 +1617,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1608,6 +1617,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
round++; round++;
tblkIdx++; tblkIdx++;
} }
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册