diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 0a42d2805a55d7f0d543c6f4ba08637cba13b4c4..53db3b49cd27e93163c2b7546e01ba9a8689b83c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -946,7 +946,7 @@ static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TS } else { SBrinRecord record[1]; tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); - if (key->ts > record->lastKey || (key->ts == record->lastKey && key->version > record->lastKeyVer)) { + if (key->ts > record->lastKey || (key->ts == record->lastKey && key->version > record->maxVer)) { if (writer->blockData->nRow > 0) { code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 62d8e38666a24db04458fc12e7dd192e590aace7..d9c9572b0e567ed60b7585597e963c87b4c0e707 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -613,19 +613,24 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo pReader->status.pCurrentFileset = pIter->pFilesetList->data[pIter->index]; STFileObj** pFileObj = pReader->status.pCurrentFileset->farr; - if (pFileObj[0] != NULL) { + if (pFileObj[0] != NULL || pFileObj[3] != NULL) { SDataFileReaderConfig conf = {.tsdb = pReader->pTsdb, .szPage = pReader->pTsdb->pVnode->config.szPage}; - conf.files[0].file = *pFileObj[0]->f; - conf.files[0].exist = true; + const char* filesName[4] = {0}; - conf.files[1].file = *pFileObj[1]->f; - conf.files[1].exist = true; + if (pFileObj[0] != NULL) { + conf.files[0].file = *pFileObj[0]->f; + conf.files[0].exist = true; + filesName[0] = pFileObj[0]->fname; - conf.files[2].file = *pFileObj[2]->f; - conf.files[2].exist = true; + conf.files[1].file = *pFileObj[1]->f; + conf.files[1].exist = true; + filesName[1] = pFileObj[1]->fname; - const char* filesName[4] = {pFileObj[0]->fname, pFileObj[1]->fname, pFileObj[2]->fname}; + conf.files[2].file = *pFileObj[2]->f; + conf.files[2].exist = true; + filesName[2] = pFileObj[2]->fname; + } if (pFileObj[3] != NULL) { conf.files[3].exist = true; @@ -1092,7 +1097,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } // 2. version range check - if (pRecord->firstKeyVer > pReader->verRange.maxVer || pRecord->lastKeyVer < pReader->verRange.minVer) { + if (pRecord->minVer > pReader->verRange.maxVer || pRecord->maxVer < pReader->verRange.minVer) { continue; } @@ -1236,8 +1241,8 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->order); } - if ((pReader->verRange.maxVer >= pRecord->firstKeyVer && pReader->verRange.maxVer < pRecord->lastKeyVer)|| - (pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.minVer > pRecord->firstKeyVer)) { + if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)|| + (pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) { int32_t i = endPos; if (asc) { @@ -1399,9 +1404,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // row index of dump info remain the initial position, let's find the appropriate start position. if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) { - if (asc && pReader->window.skey <= pRecord->firstKey && pReader->verRange.minVer <= pRecord->firstKeyVer) { + if (asc && pReader->window.skey <= pRecord->firstKey && pReader->verRange.minVer <= pRecord->minVer) { // pDumpInfo->rowIndex = 0; - } else if (!asc && pReader->window.ekey >= pRecord->lastKey && pReader->verRange.maxVer >= pRecord->lastKeyVer) { + } else if (!asc && pReader->window.ekey >= pRecord->lastKey && pReader->verRange.maxVer >= pRecord->maxVer) { // pDumpInfo->rowIndex = pRecord->numRow - 1; } else { // find the appropriate the start position in current block, and set it to be the current rowIndex int32_t pos = asc ? pRecord->numRow - 1 : 0; @@ -1413,16 +1418,16 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { tsdbError( "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->firstKeyVer, - pRecord->lastKeyVer, pReader->idStr); + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->minVer, + pRecord->maxVer, pReader->idStr); return TSDB_CODE_INVALID_PARA; } - ASSERT(pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.maxVer >= pRecord->firstKeyVer); + ASSERT(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.maxVer >= pRecord->minVer); // find the appropriate start position that satisfies the version requirement. - if ((pReader->verRange.maxVer >= pRecord->firstKeyVer && pReader->verRange.maxVer < pRecord->lastKeyVer)|| - (pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.minVer > pRecord->firstKeyVer)) { + if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)|| + (pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) { int32_t i = pDumpInfo->rowIndex; if (asc) { for(; i < pRecord->numRow; ++i) { @@ -1533,7 +1538,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, dumpedRows, - unDumpedRows, pRecord->firstKeyVer, pRecord->lastKeyVer, pBlockInfo->uid, elapsedTime, pReader->idStr); + unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -1583,9 +1588,10 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - SBrinRecord* pRecord = &pBlockInfo->record; + SBrinRecord* pRecord = &pBlockInfo->record; code = tsdbDataFileReadBlockData(pReader->pFileReader, pRecord, pBlockData); +// code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pReader->pSchema, pSup->colId, pSup->numOfCols); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, code:%s %s", @@ -1599,7 +1605,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->numRow, - pRecord->firstKeyVer, pRecord->lastKeyVer, elapsedTime, pReader->idStr); + pRecord->minVer, pRecord->maxVer, elapsedTime, pReader->idStr); pReader->cost.blockLoadTime += elapsedTime; pDumpInfo->allDumped = false; @@ -1807,8 +1813,8 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) { static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SFileDataBlockInfo* pBlock) { return (pWindow->ekey < pBlock->record.lastKey && pWindow->ekey >= pBlock->record.firstKey) || (pWindow->skey > pBlock->record.firstKey && pWindow->skey <= pBlock->record.lastKey) || - (pVerRange->minVer > pBlock->record.firstKeyVer && pVerRange->minVer <= pBlock->record.lastKeyVer) || - (pVerRange->maxVer < pBlock->record.lastKeyVer && pVerRange->maxVer >= pBlock->record.firstKeyVer); + (pVerRange->minVer > pBlock->record.minVer && pVerRange->minVer <= pBlock->record.maxVer) || + (pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer); } static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, @@ -1896,24 +1902,24 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SFileDataBlockI static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) { return (key.ts >= pBlock->record.firstKey && key.ts <= pBlock->record.lastKey) && - (pBlock->record.lastKeyVer >= pVerRange->minVer) && (pBlock->record.firstKeyVer <= pVerRange->maxVer); + (pBlock->record.maxVer >= pVerRange->minVer) && (pBlock->record.minVer <= pVerRange->maxVer); } -static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, +static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t startIndex) { size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); for (int32_t i = startIndex; i < num; i += 1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); - if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) { - if (p->version >= pBlock->minVer) { + if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) { + if (p->version >= pRecord->minVer) { return true; } - } else if (p->ts < pBlock->minKey.ts) { // p->ts < pBlock->minKey.ts - if (p->version >= pBlock->minVer) { + } else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts + if (p->version >= pRecord->minVer) { if (i < num - 1) { TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); - if (pnext->ts >= pBlock->minKey.ts) { + if (pnext->ts >= pRecord->firstKey) { return true; } } else { // it must be the last point @@ -1928,7 +1934,7 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons return false; } -static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) { +static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) { if (pBlockScanInfo->delSkyline == NULL) { return false; } @@ -1936,28 +1942,28 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDa // ts is not overlap TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); - if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) { + if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) { return false; } // version is not overlap if (ASCENDING_TRAVERSE(order)) { - return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, pBlockScanInfo->fileDelIndex); + return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex); } else { int32_t index = pBlockScanInfo->fileDelIndex; while (1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); - if (p->ts > pBlock->minKey.ts && index > 0) { + if (p->ts > pRecord->firstKey && index > 0) { index -= 1; } else { // find the first point that is smaller than the minKey.ts of dataBlock. - if (p->ts == pBlock->minKey.ts && p->version < pBlock->maxVer && index > 0) { + if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) { index -= 1; } break; } } - return doCheckforDatablockOverlap(pBlockScanInfo, pBlock, index); + return doCheckforDatablockOverlap(pBlockScanInfo, pRecord, index); } } @@ -1981,14 +1987,12 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* // overlap with neighbor if (hasNeighbor) { -// pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order); pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order); } - // todo: // has duplicated ts of different version in this block - pInfo->hasDupTs = 0;//(pBlock->nSubBlock == 1) ? pBlock->hasDup : true; - pInfo->overlapWithDelInfo = false;//overlapWithDelSkyline(pScanInfo, pBlockInfo, pReader->order); + pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count); + pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->order); if (hasDataInLastBlock(pLastBlockReader)) { int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); @@ -3307,9 +3311,11 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr if (pReader->status.pCurrentFileset != NULL) { STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3]; + if (pTombFileObj != NULL) { const TTombBlkArray* pBlkArray = NULL; - int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray); + + int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3327,6 +3333,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr uint64_t uid = pReader->status.uidList.tableUidList[j]; STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + if (pScanInfo->pDelData == NULL) { + pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); + } STombRecord record = {0}; for (int32_t k = 0; k < block.suid->size; ++k) { @@ -3337,6 +3346,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr j += 1; uid = pReader->status.uidList.tableUidList[j]; pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + if (pScanInfo->pDelData == NULL) { + pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); + } } if (record.uid < uid) {