提交 84e2db4d 编写于 作者: H Hongze Cheng

add more code

上级 1506b8a9
...@@ -1356,6 +1356,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1356,6 +1356,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx),
pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE); pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE);
ASSERT(pCompBlock != NULL); ASSERT(pCompBlock != NULL);
int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock);
if (pCompBlock->last) { if (pCompBlock->last) {
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
...@@ -1365,17 +1366,20 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1365,17 +1366,20 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); int rows2 =
tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
if (rows2 == 0) { // all data filtered out if (rows2 == 0) { // all data filtered out
*(pCommitIter->pIter) = slIter; *(pCommitIter->pIter) = slIter;
} else { } else {
if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS &&
!TSDB_NLAST_FILE_OPENED(pHelper)) {
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
tblkIdx++;
} else { } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0; int round = 0;
...@@ -1392,18 +1396,18 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1392,18 +1396,18 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
} }
tblkIdx++;
round++; round++;
} }
} }
} }
*blkIdx = pIdx->numOfBlocks;
} else { } else {
int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock);
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
if (keyFirst < pCompBlock->keyFirst) { if (keyFirst < pCompBlock->keyFirst) {
while (true) { while (true) {
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0); int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break; if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == pDataCols->numOfRows);
...@@ -1411,16 +1415,16 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1411,16 +1415,16 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++; tblkIdx++;
} }
*blkIdx = tblkIdx;
} else { } else {
ASSERT(keyFirst <= pCompBlock->keyLast); ASSERT(keyFirst <= pCompBlock->keyLast);
int16_t colId =0; int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
slIter = *(pCommitIter->pIter); slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows);
if (rows2 == 0) { // all filtered out if (rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter; *(pCommitIter->pIter) = slIter;
...@@ -1434,33 +1438,36 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ...@@ -1434,33 +1438,36 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) return -1; if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0)
return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
*blkIdx = tblkIdx + 1; tblkIdx++;
} else { } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0; int round = 0;
int dIter = 0; int dIter = 0;
while (true) { while (true) {
int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
if (rowsRead == 0) break; if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0)
return -1; return -1;
if (round == 0) { if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, 0 /*TODO*/) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else { } else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, 0 /*TODO */) < 0) return -1; if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} }
round++; round++;
tblkIdx++;
} }
} }
} }
} }
} }
*blkIdx = tblkIdx;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册