提交 17c6b9f8 编写于 作者: H Haojun Liao

fix(tsdb): fix error in set file reader.

上级 eee80077
......@@ -946,7 +946,7 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TS
} else {
SBrinRecord record[1];
tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
if (key->ts > record->lastKey || (key->ts == record->lastKey && key->version > record->lastKeyVer)) {
if (key->ts > record->lastKey || (key->ts == record->lastKey && key->version > record->maxVer)) {
if (writer->blockData->nRow > 0) {
code = tsdbDataFileDoWriteBlockData(writer, writer->blockData);
TSDB_CHECK_CODE(code, lino, _exit);
......
......@@ -613,19 +613,24 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
pReader->status.pCurrentFileset = pIter->pFilesetList->data[pIter->index];
STFileObj** pFileObj = pReader->status.pCurrentFileset->farr;
if (pFileObj[0] != NULL) {
if (pFileObj[0] != NULL || pFileObj[3] != NULL) {
SDataFileReaderConfig conf = {.tsdb = pReader->pTsdb, .szPage = pReader->pTsdb->pVnode->config.szPage};
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
const char* filesName[4] = {0};
conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
if (pFileObj[0] != NULL) {
conf.files[0].file = *pFileObj[0]->f;
conf.files[0].exist = true;
filesName[0] = pFileObj[0]->fname;
conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
conf.files[1].file = *pFileObj[1]->f;
conf.files[1].exist = true;
filesName[1] = pFileObj[1]->fname;
const char* filesName[4] = {pFileObj[0]->fname, pFileObj[1]->fname, pFileObj[2]->fname};
conf.files[2].file = *pFileObj[2]->f;
conf.files[2].exist = true;
filesName[2] = pFileObj[2]->fname;
}
if (pFileObj[3] != NULL) {
conf.files[3].exist = true;
......@@ -1092,7 +1097,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
// 2. version range check
if (pRecord->firstKeyVer > pReader->verRange.maxVer || pRecord->lastKeyVer < pReader->verRange.minVer) {
if (pRecord->minVer > pReader->verRange.maxVer || pRecord->maxVer < pReader->verRange.minVer) {
continue;
}
......@@ -1236,8 +1241,8 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->order);
}
if ((pReader->verRange.maxVer >= pRecord->firstKeyVer && pReader->verRange.maxVer < pRecord->lastKeyVer)||
(pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.minVer > pRecord->firstKeyVer)) {
if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)||
(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) {
int32_t i = endPos;
if (asc) {
......@@ -1399,9 +1404,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// row index of dump info remain the initial position, let's find the appropriate start position.
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) {
if (asc && pReader->window.skey <= pRecord->firstKey && pReader->verRange.minVer <= pRecord->firstKeyVer) {
if (asc && pReader->window.skey <= pRecord->firstKey && pReader->verRange.minVer <= pRecord->minVer) {
// pDumpInfo->rowIndex = 0;
} else if (!asc && pReader->window.ekey >= pRecord->lastKey && pReader->verRange.maxVer >= pRecord->lastKeyVer) {
} else if (!asc && pReader->window.ekey >= pRecord->lastKey && pReader->verRange.maxVer >= pRecord->maxVer) {
// pDumpInfo->rowIndex = pRecord->numRow - 1;
} else { // find the appropriate the start position in current block, and set it to be the current rowIndex
int32_t pos = asc ? pRecord->numRow - 1 : 0;
......@@ -1413,16 +1418,16 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
tsdbError(
"%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
"-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->firstKeyVer,
pRecord->lastKeyVer, pReader->idStr);
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->minVer,
pRecord->maxVer, pReader->idStr);
return TSDB_CODE_INVALID_PARA;
}
ASSERT(pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.maxVer >= pRecord->firstKeyVer);
ASSERT(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.maxVer >= pRecord->minVer);
// find the appropriate start position that satisfies the version requirement.
if ((pReader->verRange.maxVer >= pRecord->firstKeyVer && pReader->verRange.maxVer < pRecord->lastKeyVer)||
(pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.minVer > pRecord->firstKeyVer)) {
if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)||
(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) {
int32_t i = pDumpInfo->rowIndex;
if (asc) {
for(; i < pRecord->numRow; ++i) {
......@@ -1533,7 +1538,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, dumpedRows,
unDumpedRows, pRecord->firstKeyVer, pRecord->lastKeyVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -1583,9 +1588,10 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord* pRecord = &pBlockInfo->record;
SBrinRecord* pRecord = &pBlockInfo->record;
code = tsdbDataFileReadBlockData(pReader->pFileReader, pRecord, pBlockData);
// code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pReader->pSchema, pSup->colId, pSup->numOfCols);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
......@@ -1599,7 +1605,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->numRow,
pRecord->firstKeyVer, pRecord->lastKeyVer, elapsedTime, pReader->idStr);
pRecord->minVer, pRecord->maxVer, elapsedTime, pReader->idStr);
pReader->cost.blockLoadTime += elapsedTime;
pDumpInfo->allDumped = false;
......@@ -1807,8 +1813,8 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SFileDataBlockInfo* pBlock) {
return (pWindow->ekey < pBlock->record.lastKey && pWindow->ekey >= pBlock->record.firstKey) ||
(pWindow->skey > pBlock->record.firstKey && pWindow->skey <= pBlock->record.lastKey) ||
(pVerRange->minVer > pBlock->record.firstKeyVer && pVerRange->minVer <= pBlock->record.lastKeyVer) ||
(pVerRange->maxVer < pBlock->record.lastKeyVer && pVerRange->maxVer >= pBlock->record.firstKeyVer);
(pVerRange->minVer > pBlock->record.minVer && pVerRange->minVer <= pBlock->record.maxVer) ||
(pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer);
}
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
......@@ -1896,24 +1902,24 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SFileDataBlockI
static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->record.firstKey && key.ts <= pBlock->record.lastKey) &&
(pBlock->record.lastKeyVer >= pVerRange->minVer) && (pBlock->record.firstKeyVer <= pVerRange->maxVer);
(pBlock->record.maxVer >= pVerRange->minVer) && (pBlock->record.minVer <= pVerRange->maxVer);
}
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
int32_t startIndex) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = startIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
if (p->version >= pBlock->minVer) {
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
if (p->version >= pRecord->minVer) {
return true;
}
} else if (p->ts < pBlock->minKey.ts) { // p->ts < pBlock->minKey.ts
if (p->version >= pBlock->minVer) {
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
if (p->version >= pRecord->minVer) {
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (pnext->ts >= pBlock->minKey.ts) {
if (pnext->ts >= pRecord->firstKey) {
return true;
}
} else { // it must be the last point
......@@ -1928,7 +1934,7 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons
return false;
}
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL) {
return false;
}
......@@ -1936,28 +1942,28 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDa
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
return false;
}
// version is not overlap
if (ASCENDING_TRAVERSE(order)) {
return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex);
return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
} else {
int32_t index = pBlockScanInfo->fileDelIndex;
while (1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pBlock->minKey.ts && index > 0) {
if (p->ts > pRecord->firstKey && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
if (p->ts == pBlock->minKey.ts && p->version < pBlock->maxVer && index > 0) {
if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) {
index -= 1;
}
break;
}
}
return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index);
return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, index);
}
}
......@@ -1981,14 +1987,12 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
// overlap with neighbor
if (hasNeighbor) {
// pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order);
}
// todo:
// has duplicated ts of different version in this block
pInfo->hasDupTs = 0;//(pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
pInfo->overlapWithDelInfo = false;//overlapWithDelSkyline(pScanInfo, pBlockInfo, pReader->order);
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->order);
if (hasDataInLastBlock(pLastBlockReader)) {
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
......@@ -3307,9 +3311,11 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
if (pReader->status.pCurrentFileset != NULL) {
STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3];
if (pTombFileObj != NULL) {
const TTombBlkArray* pBlkArray = NULL;
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3327,6 +3333,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
uint64_t uid = pReader->status.uidList.tableUidList[j];
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pDelData == NULL) {
pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
}
STombRecord record = {0};
for (int32_t k = 0; k < block.suid->size; ++k) {
......@@ -3337,6 +3346,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
j += 1;
uid = pReader->status.uidList.tableUidList[j];
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pDelData == NULL) {
pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
}
}
if (record.uid < uid) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册