diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 45066531a815a01f403b6b96d0ad3d5017e919a6..fcb9d0b4f44b5da018c7374b6d0533513221feed 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -132,8 +132,8 @@ typedef struct SFilesetIter { typedef struct SFileDataBlockInfo { // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it - uint64_t uid; - int32_t tbBlockIdx; + uint64_t uid; + int32_t tbBlockIdx; SBrinRecord record; } SFileDataBlockInfo; @@ -161,7 +161,6 @@ typedef struct STableUidList { typedef struct SReaderStatus { bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not -// bool mapDataCleaned; // mapData has been cleaned up alreay or not SSHashObj* pTableMap; // SHash STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks. STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT. @@ -173,6 +172,7 @@ typedef struct SReaderStatus { SArray* pLDataIterArray; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data + TFileSetArray* pfSetArray; } SReaderStatus; typedef struct SBlockInfoBuf { @@ -182,15 +182,15 @@ typedef struct SBlockInfoBuf { int32_t numOfTables; } SBlockInfoBuf; -typedef struct STsdbReaderAttr { +typedef struct STsdbReaderInfo { + uint64_t suid; STSchema* pSchema; EReadMode readMode; uint64_t rowsNum; STimeWindow window; - bool freeBlock; SVersionRange verRange; int16_t order; -} STsdbReaderAttr; +} STsdbReaderInfo; typedef struct SResultBlockInfo { SSDataBlock* pResBlock; @@ -200,32 +200,24 @@ typedef struct SResultBlockInfo { struct STsdbReader { STsdb* pTsdb; - SVersionRange verRange; + STsdbReaderInfo info; TdThreadMutex readerMutex; EReaderStatus flag; int32_t code; - uint64_t suid; - int16_t order; - EReadMode readMode; uint64_t rowsNum; - STimeWindow window; // the primary query time window that applies to all queries SResultBlockInfo resBlockInfo; SReaderStatus status; char* idStr; // query info handle, for debug purpose int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; - SCostSummary cost; + SCostSummary cost; SHashObj** pIgnoreTables; - STSchema* pSchema; // the newest version schema SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema SDataFileReader* pFileReader; // the file reader - SDelFReader* pDelFReader; // the del file reader, todo remove it - SArray* pDelIdx; // del file block index; SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; - TFileSetArray* pfSetArray; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -437,12 +429,12 @@ static SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBu pScanInfo->uid = idList[j].uid; pUidList->tableUidList[j] = idList[j].uid; - if (ASCENDING_TRAVERSE(pTsdbReader->order)) { - int64_t skey = pTsdbReader->window.skey; + if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { + int64_t skey = pTsdbReader->info.window.skey; pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; pScanInfo->lastKeyInStt = skey; } else { - int64_t ekey = pTsdbReader->window.ekey; + int64_t ekey = pTsdbReader->info.window.ekey; pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pScanInfo->lastKeyInStt = ekey; } @@ -539,8 +531,8 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) { static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetArray, STsdbReader* pReader) { size_t numOfFileset = TARRAY2_SIZE(pFileSetArray); - pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset; - pIter->order = pReader->order; + pIter->index = ASCENDING_TRAVERSE(pReader->info.order) ? -1 : numOfFileset; + pIter->order = pReader->info.order; pIter->pFilesetList = pFileSetArray; pIter->numOfFiles = numOfFileset; @@ -554,9 +546,9 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA } SLastBlockReader* pLReader = pIter->pLastBlockReader; - pLReader->order = pReader->order; - pLReader->window = pReader->window; - pLReader->verRange = pReader->verRange; + pLReader->order = pReader->info.order; + pLReader->window = pReader->info.window; + pLReader->verRange = pReader->info.verRange; pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); @@ -631,14 +623,14 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); // current file are no longer overlapped with query time window, ignore remain files - if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) { + if ((asc && win.skey > pReader->info.window.ekey) || (!asc && win.ekey < pReader->info.window.skey)) { tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, - pReader->window.skey, pReader->window.ekey, pReader->idStr); + pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); *hasNext = false; return TSDB_CODE_SUCCESS; } - if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) { + if ((asc && (win.ekey < pReader->info.window.skey)) || ((!asc) && (win.skey > pReader->info.window.ekey))) { pIter->index += step; if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) { *hasNext = false; @@ -647,8 +639,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo continue; } - tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey, - pReader->window.ekey, pReader->idStr); + tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, + pReader->info.window.ekey, pReader->idStr); *hasNext = true; return TSDB_CODE_SUCCESS; } @@ -792,13 +784,13 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void initReaderStatus(&pReader->status); pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level); - pReader->suid = pCond->suid; - pReader->order = pCond->order; + pReader->info.suid = pCond->suid; + pReader->info.order = pCond->order; pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; - pReader->verRange = getQueryVerRange(pVnode, pCond, level); + pReader->info.verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; - pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); + pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); @@ -887,22 +879,22 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead while (i < TARRAY2_SIZE(pBlkArray)) { pBrinBlk = &pBlkArray->data[i]; - if (pBrinBlk->maxTbid.suid < pReader->suid) { + if (pBrinBlk->maxTbid.suid < pReader->info.suid) { i += 1; continue; } - if (pBrinBlk->minTbid.suid > pReader->suid) { // not include the queried table/super table, quit the loop + if (pBrinBlk->minTbid.suid > pReader->info.suid) { // not include the queried table/super table, quit the loop break; } - ASSERT(pBrinBlk->minTbid.suid <= pReader->suid && pBrinBlk->maxTbid.suid >= pReader->suid); - if (pBrinBlk->maxTbid.suid == pReader->suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) { + ASSERT(pBrinBlk->minTbid.suid <= pReader->info.suid && pBrinBlk->maxTbid.suid >= pReader->info.suid); + if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) { i += 1; continue; } - if (pBrinBlk->minTbid.suid == pReader->suid && pBrinBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) { + if (pBrinBlk->minTbid.suid == pReader->info.suid && pBrinBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) { break; } @@ -952,13 +944,13 @@ static void cleanupTableScanInfo(SReaderStatus* pStatus) { } typedef struct SBrinRecordIter { - SArray* pBrinBlockList; - SBrinBlk* pCurrentBlk; - int32_t blockIndex; - int32_t recordIndex; + SArray* pBrinBlockList; + SBrinBlk* pCurrentBlk; + int32_t blockIndex; + int32_t recordIndex; SDataFileReader* pReader; - SBrinBlock block; - SBrinRecord record; + SBrinBlock block; + SBrinRecord record; } SBrinRecordIter; void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) { @@ -1003,32 +995,28 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN int32_t k = 0; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - STimeWindow w = pReader->window; + int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; + STimeWindow w = pReader->info.window; SBrinRecord* pRecord = NULL; SBrinRecordIter iter = {0}; initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); - while (k < numOfTables) { - pRecord = getNextBrinRecord(&iter); - - uint64_t uid = pReader->status.uidList.tableUidList[k]; - if (pRecord == NULL || pRecord->suid > pReader->suid) { + while (((pRecord = getNextBrinRecord(&iter)) != NULL)) { + if (pRecord->suid > pReader->info.suid) { break; } - if (pRecord->suid < pReader->suid) { + uint64_t uid = pReader->status.uidList.tableUidList[k]; + if (pRecord->suid < pReader->info.suid) { continue; } - ASSERT(pRecord->suid == pReader->suid); - if (pRecord->uid < uid) { - continue; - } + if (uid < pRecord->uid) { // forward the table uid index + while (pReader->status.uidList.tableUidList[k] < pRecord->uid && k < numOfTables) { + k += 1; + } - while (pRecord->uid > uid && k < numOfTables) { - k += 1; if (k >= numOfTables) { break; } @@ -1036,18 +1024,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN uid = pReader->status.uidList.tableUidList[k]; } - if (k >= numOfTables) { - break; - } - if (pRecord->uid < uid) { continue; } - ASSERT(pRecord->suid == pReader->suid && uid == pRecord->uid); + ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid); STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (ASCENDING_TRAVERSE(pReader->order)) { + if (ASCENDING_TRAVERSE(pReader->info.order)) { w.skey = pScanInfo->lastKey + step; } else { w.ekey = pScanInfo->lastKey + step; @@ -1064,7 +1048,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } // 2. version range check - if (pRecord->minVer > pReader->verRange.maxVer || pRecord->maxVer < pReader->verRange.minVer) { + if (pRecord->minVer > pReader->info.verRange.maxVer || pRecord->maxVer < pReader->info.verRange.minVer) { continue; } @@ -1201,30 +1185,30 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SBrinRecord* pRecord, int32_t pos) { // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); - if (asc && pReader->window.ekey >= pRecord->lastKey) { + if (asc && pReader->info.window.ekey >= pRecord->lastKey) { endPos = pRecord->numRow - 1; - } else if (!asc && pReader->window.skey <= pRecord->firstKey) { + } else if (!asc && pReader->info.window.skey <= pRecord->firstKey) { endPos = 0; } else { - int64_t key = asc ? pReader->window.ekey : pReader->window.skey; - endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->order); + int64_t key = asc ? pReader->info.window.ekey : pReader->info.window.skey; + endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->info.order); } - if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)|| - (pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) { + if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)|| + (pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) { int32_t i = endPos; if (asc) { for(; i >= 0; --i) { - if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) { + if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) { break; } } } else { for(; i < pRecord->numRow; ++i) { - if (pBlockData->aVersion[i] >= pReader->verRange.minVer) { + if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) { break; } } @@ -1360,7 +1344,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { SColVal cv = {0}; int64_t st = taosGetTimestampUs(); - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; SBrinRecord* pRecord = &pBlockInfo->record; @@ -1375,14 +1359,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // row index of dump info remain the initial position, let's find the appropriate start position. if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) { - if (asc && pReader->window.skey <= pRecord->firstKey && pReader->verRange.minVer <= pRecord->minVer) { + if (asc && pReader->info.window.skey <= pRecord->firstKey && pReader->info.verRange.minVer <= pRecord->minVer) { // pDumpInfo->rowIndex = 0; - } else if (!asc && pReader->window.ekey >= pRecord->lastKey && pReader->verRange.maxVer >= pRecord->maxVer) { + } else if (!asc && pReader->info.window.ekey >= pRecord->lastKey && pReader->info.verRange.maxVer >= pRecord->maxVer) { // pDumpInfo->rowIndex = pRecord->numRow - 1; } else { // find the appropriate the start position in current block, and set it to be the current rowIndex int32_t pos = asc ? pRecord->numRow - 1 : 0; int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - int64_t key = asc ? pReader->window.skey : pReader->window.ekey; + int64_t key = asc ? pReader->info.window.skey : pReader->info.window.ekey; pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, order); if (pDumpInfo->rowIndex < 0) { @@ -1394,21 +1378,21 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { return TSDB_CODE_INVALID_PARA; } - ASSERT(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.maxVer >= pRecord->minVer); + ASSERT(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer); // find the appropriate start position that satisfies the version requirement. - if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)|| - (pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) { + if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)|| + (pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) { int32_t i = pDumpInfo->rowIndex; if (asc) { for(; i < pRecord->numRow; ++i) { - if (pBlockData->aVersion[i] >= pReader->verRange.minVer) { + if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) { break; } } } else { for(; i >= 0; --i) { - if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) { + if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) { break; } } @@ -1422,7 +1406,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // time window check int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pRecord, pDumpInfo->rowIndex); if (endIndex == -1) { - setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order); + setBlockAllDumped(pDumpInfo, pReader->info.window.ekey, pReader->info.order); return TSDB_CODE_SUCCESS; } @@ -1431,7 +1415,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check dumpedRows = pReader->resBlockInfo.capacity; } else if (dumpedRows <= 0) { // no qualified rows in current data block, abort directly. - setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order); + setBlockAllDumped(pDumpInfo, pReader->info.window.ekey, pReader->info.order); return TSDB_CODE_SUCCESS; } @@ -1494,12 +1478,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - if (outOfTimeWindow(ts, &pReader->window)) { // the remain data has out of query time window, ignore current block - setBlockAllDumped(pDumpInfo, ts, pReader->order); + if (outOfTimeWindow(ts, &pReader->info.window)) { // the remain data has out of query time window, ignore current block + setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } } else { int64_t ts = asc ? pRecord->lastKey : pRecord->firstKey; - setBlockAllDumped(pDumpInfo, ts, pReader->order); + setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; @@ -1515,34 +1499,34 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { } static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) { - ASSERT(pReader->pSchema == NULL); + ASSERT(pReader->info.pSchema == NULL); - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); - if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) { + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema); + if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) { terrno = code; tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); return NULL; } - code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema); + code = tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { terrno = code; tsdbError("failed to init merger, code:%s, %s", tstrerror(code), pReader->idStr); return NULL; } - return pReader->pSchema; + return pReader->info.pSchema; } static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) { int32_t code = 0; - STSchema* pSchema = pReader->pSchema; + STSchema* pSchema = pReader->info.pSchema; int64_t st = taosGetTimestampUs(); tBlockDataReset(pBlockData); - if (pReader->pSchema == NULL) { + if (pReader->info.pSchema == NULL) { pSchema = getTableSchemaImpl(pReader, uid); if (pSchema == NULL) { tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr); @@ -1648,7 +1632,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) } static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; pBlockIter->numOfBlocks = numOfBlocks; @@ -1948,16 +1932,16 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* int32_t neighborIndex = 0; SBrinRecord rec = {0}; - bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &rec); + bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec); // overlap with neighbor if (hasNeighbor) { - pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order); + pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order); } // has duplicated ts of different version in this block pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count); - pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->order); + pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order); if (hasDataInLastBlock(pLastBlockReader)) { int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); @@ -1965,8 +1949,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* } pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity; - pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlockInfo); - pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlockInfo, &pReader->verRange); + pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->info.window, &pReader->info.verRange, pBlockInfo); + pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlockInfo, &pReader->info.verRange); } // 1. the version of all rows should be less than the endVersion @@ -2039,9 +2023,9 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB int32_t code = TSDB_CODE_SUCCESS; *copied = false; - bool asc = (pReader->order == TSDB_ORDER_ASC); + bool asc = (pReader->info.order == TSDB_ORDER_ASC); if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) { - int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1; + int32_t step = pReader->info.order == TSDB_ORDER_ASC ? 1 : -1; int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; if (nextKey != key) { // merge is not needed @@ -2088,7 +2072,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas *copied = false; - bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange); + bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange); if (hasVal) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 != ts) { @@ -2114,16 +2098,16 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas } static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { - // always set the newest schema version in pReader->pSchema - if (pReader->pSchema == NULL) { + // always set the newest schema version in pReader->info.pSchema + if (pReader->info.pSchema == NULL) { STSchema* ps = getTableSchemaImpl(pReader, uid); if (ps == NULL) { return NULL; } } - if (pReader->pSchema && sversion == pReader->pSchema->version) { - return pReader->pSchema; + if (pReader->info.pSchema && sversion == pReader->info.pSchema->version) { + return pReader->info.pSchema; } void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion)); @@ -2132,7 +2116,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } STSchema* ptr = NULL; - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &ptr); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -2161,9 +2145,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* TSDBKEY k = TSDBROW_KEY(pRow); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized if (pMerger->pArray == NULL) { - ASSERT(pReader->pSchema == NULL); + ASSERT(pReader->info.pSchema == NULL); STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); if (ps == NULL) { return terrno; @@ -2171,7 +2155,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } int64_t minKey = 0; - if (pReader->order == TSDB_ORDER_ASC) { + if (pReader->info.order == TSDB_ORDER_ASC) { minKey = INT64_MAX; // chosen the minimum value if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) { minKey = tsLast; @@ -2204,10 +2188,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* // ASC: file block ---> last block -----> imem -----> mem // DESC: mem -----> imem -----> last block -----> file block - if (pReader->order == TSDB_ORDER_ASC) { + if (pReader->info.order == TSDB_ORDER_ASC) { if (minKey == key) { init = true; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2220,12 +2204,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, fRow1, NULL); } else { init = true; - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == k.ts) { @@ -2272,12 +2256,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, fRow1, NULL); } else { init = true; - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == key) { @@ -2285,7 +2269,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, &fRow, NULL); } else { init = true; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2335,14 +2319,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2359,12 +2343,12 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } } } else { // not merge block data - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr); // merge with block data if ts == key if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { @@ -2394,9 +2378,9 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SRowMerger* pMerger = &pReader->status.merger; - // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized if (pMerger->pArray == NULL) { - ASSERT(pReader->pSchema == NULL); + ASSERT(pReader->info.pSchema == NULL); STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); if (ps == NULL) { return terrno; @@ -2413,12 +2397,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); - if (ASCENDING_TRAVERSE(pReader->order)) { + if (ASCENDING_TRAVERSE(pReader->info.order)) { if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key == ts) { SRow* pTSRow = NULL; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2428,7 +2412,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2481,9 +2465,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized if (pMerger->pArray == NULL) { - ASSERT(pReader->pSchema == NULL); + ASSERT(pReader->info.pSchema == NULL); STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); if (ps == NULL) { return terrno; @@ -2491,7 +2475,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } int64_t minKey = 0; - if (ASCENDING_TRAVERSE(pReader->order)) { + if (ASCENDING_TRAVERSE(pReader->info.order)) { minKey = INT64_MAX; // let's find the minimum if (minKey > k.ts) { minKey = k.ts; @@ -2531,11 +2515,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* // ASC: file block -----> last block -----> imem -----> mem // DESC: mem -----> imem -----> last block -----> file block - if (ASCENDING_TRAVERSE(pReader->order)) { + if (ASCENDING_TRAVERSE(pReader->info.order)) { if (minKey == key) { init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2549,13 +2533,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, pRow1, NULL); } else { init = true; - code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == ik.ts) { @@ -2626,18 +2610,18 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsdbRowMergerAdd(pMerger, pRow1, NULL); } else { init = true; - code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } if (minKey == key) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); if (!init) { - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2663,11 +2647,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScanInfo* pBlockScanInfo, TSDBKEY* pKey, SMemTable* pMem, SIterInfo* pIter, const char* type) { int32_t code = TSDB_CODE_SUCCESS; - int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); + int32_t backward = (!ASCENDING_TRAVERSE(pReader->info.order)); pIter->hasVal = false; if (pMem != NULL) { - *pData = tsdbGetTbDataFromMemTable(pMem, pReader->suid, pBlockScanInfo->uid); + *pData = tsdbGetTbDataFromMemTable(pMem, pReader->info.suid, pBlockScanInfo->uid); if ((*pData) != NULL) { code = tsdbTbDataIterCreate((*pData), pKey, backward, &pIter->iter); @@ -2676,7 +2660,7 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan tsdbDebug("%p uid:%" PRIu64 ", check data in %s from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 " %s", - pReader, pBlockScanInfo->uid, type, pKey->ts, pReader->order, (*pData)->minKey, (*pData)->maxKey, + pReader, pBlockScanInfo->uid, type, pKey->ts, pReader->info.order, (*pData)->minKey, (*pData)->maxKey, pReader->idStr); } else { tsdbError("%p uid:%" PRIu64 ", failed to create iterator for %s, code:%s, %s", pReader, pBlockScanInfo->uid, @@ -2726,10 +2710,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea STbData* d = NULL; TSDBKEY startKey = {0}; - if (ASCENDING_TRAVERSE(pReader->order)) { - startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer}; + if (ASCENDING_TRAVERSE(pReader->info.order)) { + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->info.verRange.minVer}; } else { - startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer}; } int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, @@ -2745,7 +2729,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea return code; } - doLoadMemTombData(pBlockScanInfo, d, di, pReader->verRange.maxVer); + doLoadMemTombData(pBlockScanInfo, d, di, pReader->info.verRange.maxVer); pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; @@ -2763,17 +2747,17 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum // check for version and time range int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex]; - if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) { + if (ver > pReader->info.verRange.maxVer || ver < pReader->info.verRange.minVer) { return false; } int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - if (ts > pReader->window.ekey || ts < pReader->window.skey) { + if (ts > pReader->info.window.ekey || ts < pReader->info.window.skey) { return false; } - if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->order, - &pReader->verRange)) { + if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->info.order, + &pReader->info.verRange)) { return false; } @@ -2809,8 +2793,8 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t return TSDB_CODE_SUCCESS; } -static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, - STableBlockScanInfo* pBlockScanInfo, uint64_t maxVer) { +static int32_t loadTombRecordsFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo, + uint64_t maxVer) { int32_t size = taosArrayGetSize(pLDataIterList); if (size <= 0) { return TSDB_CODE_SUCCESS; @@ -2883,41 +2867,44 @@ static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfT return code; } - if (record.suid < pReader->suid) { + if (record.suid < pReader->info.suid) { continue; } - if (record.suid > pReader->suid) { + if (record.suid > pReader->info.suid) { tTombBlockDestroy(&block); return TSDB_CODE_SUCCESS; } - ASSERT(record.suid == pReader->suid); + bool newTable = false; + if (uid < record.uid) { + while (pReader->status.uidList.tableUidList[j] < record.uid && j < numOfTables) { + j += 1; + newTable = true; + } + + if (j >= numOfTables) { + tTombBlockDestroy(&block); + break; + } + + uid = pReader->status.uidList.tableUidList[j]; + } + if (record.uid < uid) { continue; } - bool newTable = false; - while (uid < record.uid && j < (numOfTables - 1)) { - j += 1; - uid = pReader->status.uidList.tableUidList[j]; - newTable = true; - } + ASSERT(record.suid == pReader->info.suid && uid == record.uid); - if (uid != record.uid) { - tTombBlockDestroy(&block); - return TSDB_CODE_SUCCESS; - } else { - if (newTable) { - pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (pScanInfo->pfileDelData == NULL) { - pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); - } + if (newTable) { + pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + if (pScanInfo->pfileDelData == NULL) { + pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); } } - ASSERT(record.uid == uid); - if (record.version <= pReader->verRange.maxVer) { + if (record.version <= pReader->info.verRange.maxVer) { SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; taosArrayPush(pScanInfo->pfileDelData, &delData); } @@ -2954,22 +2941,22 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan pScanInfo->uid, pReader->idStr); int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, - pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false, - pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->pSchema, + pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false, + pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->info.pSchema, pReader->suppInfo.colId, pReader->suppInfo.numOfCols); if (code != TSDB_CODE_SUCCESS) { return false; } - code = loadTombRecordInfoFromSttFiles(pReader->status.pLDataIterArray, pReader->suid, pScanInfo, pReader->verRange.maxVer); + code = loadTombRecordsFromSttFiles(pReader->status.pLDataIterArray, pReader->info.suid, pScanInfo, pReader->info.verRange.maxVer); if (code != TSDB_CODE_SUCCESS) { return false; } initMemDataIterator(pScanInfo, pReader); - initDelSkylineIterator(pScanInfo, pReader->order, &pReader->cost); - code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange); + initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); + code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); int64_t el = taosGetTimestampUs() - st; pReader->cost.initLastBlockReader += (el / 1000.0); @@ -2998,9 +2985,9 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc return code; } - // merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized + // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized if (pMerger->pArray == NULL) { - ASSERT(pReader->pSchema == NULL); + ASSERT(pReader->info.pSchema == NULL); STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid); if (ps == NULL) { return terrno; @@ -3014,7 +3001,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SRow* pTSRow = NULL; - code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema); + code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3069,18 +3056,18 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, bool* loadNeighbor) { int32_t code = TSDB_CODE_SUCCESS; - int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; int32_t nextIndex = -1; *loadNeighbor = false; SBrinRecord rec = {0}; - bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &rec); + bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec); if (!hasNeighbor) { // do nothing return code; } - if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order)) { // load next block + if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order)) { // load next block SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; @@ -3126,7 +3113,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); int64_t st = taosGetTimestampUs(); int32_t step = asc ? 1 : -1; double el = 0; @@ -3137,7 +3124,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo != NULL) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { - setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order); + setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order); return code; } @@ -3167,7 +3154,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ASSERT(0); pBlockScanInfo = *pReader->status.pTableIter; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { -// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); +// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order); return code; } } @@ -3194,7 +3181,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { bool loadNeighbor = false; code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor); if ((!loadNeighbor) || (code != 0)) { - setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order); + setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order); break; } } @@ -3214,7 +3201,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { // pBlock = getCurrentBlock(&pReader->status.blockIter); - setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order); + setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order); break; } @@ -3287,7 +3274,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde } TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL}; bool hasKey = false, hasIKey = false; @@ -3484,7 +3471,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->order); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); return code; } @@ -3508,13 +3495,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // build composed data block code = buildComposedDataBlock(pReader); - } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlockInfo)) { + } else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo)) { // data in memory that are earlier than current file block // rows in buffer should be less than the file block in asc, greater than file block in desc - int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) { + if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { // only return the rows in last block int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); ASSERT(tsLast >= pBlockInfo->record.lastKey); @@ -3561,10 +3548,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { pInfo->dataLoad = 0; pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey}; setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->order); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); // update the last key for the corresponding table - pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey; + pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, @@ -3595,7 +3582,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade SBlockIdx* pBlockIdx = NULL; for (int32_t i = 0; i < num; ++i) { pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); - if (pBlockIdx->suid != pReader->suid) { + if (pBlockIdx->suid != pReader->info.suid) { continue; } @@ -3637,7 +3624,7 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) { // all identical if (pStart->suid == pEnd->suid) { - if (pStart->suid != pReader->suid) { + if (pStart->suid != pReader->info.suid) { // no qualified stt block existed taosArrayClear(pBlockLoadInfo->aSttBlk); continue; @@ -3650,13 +3637,13 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) { for (int32_t j = 0; j < size; ++j) { SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j); uint64_t s = p->suid; - if (s < pReader->suid) { + if (s < pReader->info.suid) { continue; } - if (s == pReader->suid) { + if (s == pReader->info.suid) { pReader->rowsNum += p->nRow; - } else if (s > pReader->suid) { + } else if (s > pReader->info.suid) { break; } } @@ -3734,9 +3721,9 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } initMemDataIterator(*pBlockScanInfo, pReader); - initDelSkylineIterator(*pBlockScanInfo, pReader->order, &pReader->cost); + initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); - int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN; + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3756,7 +3743,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { // set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - int64_t lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX; + int64_t lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MIN : INT64_MAX; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SReaderStatus* pStatus = &pReader->status; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; @@ -3768,7 +3755,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) } pDumpInfo->totalRows = pBlockInfo->record.numRow; - pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlockInfo->record.numRow - 1; + pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->record.numRow - 1; } else { pDumpInfo->totalRows = 0; pDumpInfo->rowIndex = 0; @@ -3800,7 +3787,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->order); + resetDataBlockIterator(pBlockIter, pReader->info.order); resetTableListIndex(&pReader->status); } @@ -3858,7 +3845,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { static int32_t buildBlockFromFiles(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); SDataBlockIter* pBlockIter = &pReader->status.blockIter; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; @@ -3897,7 +3884,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->order); + resetDataBlockIterator(pBlockIter, pReader->info.order); resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromLastFiles(pReader); @@ -4085,14 +4072,14 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); TSDBKEY key = TSDBROW_KEY(pRow); - if (outOfTimeWindow(key.ts, &pReader->window)) { + if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; return NULL; } // it is a valid data version - if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) && - (!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) { + if ((key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) && + (!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->info.order, &pReader->info.verRange))) { return pRow; } @@ -4105,13 +4092,13 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p pRow = tsdbTbDataIterGet(pIter->iter); key = TSDBROW_KEY(pRow); - if (outOfTimeWindow(key.ts, &pReader->window)) { + if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; return NULL; } - if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer && - (!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) { + if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer && + (!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->info.order, &pReader->info.verRange))) { return pRow; } } @@ -4179,17 +4166,17 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn CHECK_FILEBLOCK_STATE* state) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); *state = CHECK_FILEBLOCK_QUIT; - int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; bool loadNeighbor = true; int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor); if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) { pDumpInfo->rowIndex = - doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); + doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step); if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) { *state = CHECK_FILEBLOCK_CONT; } @@ -4202,14 +4189,14 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SRowMerger* pMerger = &pReader->status.merger; - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int32_t step = asc ? 1 : -1; pDumpInfo->rowIndex += step; if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) { pDumpInfo->rowIndex = - doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); + doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step); } // all rows are consumed, let's try next file block @@ -4303,7 +4290,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1); } else { // let's merge rows in file block - code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pReader->pSchema); + code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4344,7 +4331,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return terrno; } - if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem + if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4393,7 +4380,7 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea uint64_t uid = pBlockScanInfo->uid; // todo refactor - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); if (pBlockScanInfo->iter.hasVal) { TSDBKEY k = TSDBROW_KEY(pRow); if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) { @@ -4627,12 +4614,12 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t pUidList->tableUidList[i] = pList[i].uid; // todo extract method - if (ASCENDING_TRAVERSE(pReader->order)) { - int64_t skey = pReader->window.skey; + if (ASCENDING_TRAVERSE(pReader->info.order)) { + int64_t skey = pReader->info.window.skey; pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; pInfo->lastKeyInStt = skey; } else { - int64_t ekey = pReader->window.ekey; + int64_t ekey = pReader->info.window.ekey; pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pInfo->lastKeyInStt = ekey; } @@ -4657,19 +4644,19 @@ void* tsdbGetIvtIdx2(SMeta* pMeta) { return metaGetIvtIdx(pMeta); } -uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->verRange.maxVer; } +uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.verRange.maxVer; } static int32_t doOpenReaderImpl(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - initFilesetIterator(&pStatus->fileIter, pReader->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->order); + initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader); + resetDataBlockIterator(&pStatus->blockIter, pReader->info.order); int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { pStatus->loadFromFile = false; - } else if (READ_MODE_COUNT_ONLY == pReader->readMode) { + } else if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { // DO NOTHING } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -4705,7 +4692,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi // check for query time window STsdbReader* pReader = *ppReader; - if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) { + if (isEmptyQueryTimeWindow(&pReader->info.window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) { tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -4748,20 +4735,20 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi // no valid error code set in metaGetTbTSchema, so let's set the error code here. // we should proceed in case of tmq processing. if (pCond->suid != 0) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1); - if (pReader->pSchema == NULL) { - tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr); + pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, -1, 1); + if (pReader->info.pSchema == NULL) { + tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr); } } else if (numOfTables > 0) { STableKeyInfo* pKey = pTableList; - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); - if (pReader->pSchema == NULL) { + pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); + if (pReader->info.pSchema == NULL) { tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); } } - if (pReader->pSchema != NULL) { - tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema); + if (pReader->info.pSchema != NULL) { + tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema); } pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); @@ -4772,8 +4759,8 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi } tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc); - if (pReader->pSchema != NULL) { - code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); + if (pReader->info.pSchema != NULL) { + code = updateBlockSMAInfo(pReader->info.pSchema, &pReader->suppInfo); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -4797,15 +4784,15 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi pReader->flag = READER_STATUS_SUSPEND; if (countOnly) { - pReader->readMode = READ_MODE_COUNT_ONLY; + pReader->info.readMode = READ_MODE_COUNT_ONLY; } pReader->pIgnoreTables = pIgnoreTables; tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64 " in this query %s", - pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->verRange.minVer, - pReader->verRange.maxVer, pReader->idStr); + pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer, + pReader->info.verRange.maxVer, pReader->idStr); return code; @@ -4819,22 +4806,22 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi static void clearSharedPtr(STsdbReader* p) { p->status.pTableMap = NULL; p->status.uidList.tableUidList = NULL; + p->status.pfSetArray = NULL; + p->info.pSchema = NULL; p->pReadSnap = NULL; - p->pSchema = NULL; p->pSchemaMap = NULL; - p->pfSetArray = NULL; } static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->status.pTableMap = pSrc->status.pTableMap; pDst->status.uidList = pSrc->status.uidList; - pDst->pSchema = pSrc->pSchema; + pDst->status.pfSetArray = pSrc->status.pfSetArray; + pDst->info.pSchema = pSrc->info.pSchema; pDst->pSchemaMap = pSrc->pSchemaMap; pDst->pReadSnap = pSrc->pReadSnap; - pDst->pfSetArray = pSrc->pfSetArray; - if (pDst->pSchema) { - tsdbRowMergerInit(&pDst->status.merger, pDst->pSchema); + if (pDst->info.pSchema) { + tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema); } } @@ -4885,15 +4872,6 @@ void tsdbReaderClose2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); } - if (pReader->pDelFReader != NULL) { - tsdbDelFReaderClose(&pReader->pDelFReader); - } - - if (pReader->pDelIdx != NULL) { - taosArrayDestroy(pReader->pDelIdx); - pReader->pDelIdx = NULL; - } - qTrace("tsdb/reader-close: %p, untake snapshot", pReader); tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true); pReader->pReadSnap = NULL; @@ -4926,7 +4904,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFree(pReader->idStr); tsdbRowMergerCleanup(&pReader->status.merger); - taosMemoryFree(pReader->pSchema); + taosMemoryFree(pReader->info.pSchema); tSimpleHashCleanup(pReader->pSchemaMap); taosMemoryFreeClear(pReader); @@ -5001,7 +4979,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { if (pBlockScanInfo) { // save lastKey to restore memory iterator STimeWindow w = pReader->resBlockInfo.pResBlock->info.window; - pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey; + pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey; // reset current current table's data block scan info, pBlockScanInfo->iterInit = false; @@ -5148,7 +5126,7 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { return code; } - if (READ_MODE_COUNT_ONLY == pReader->readMode) { + if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { return tsdbReadRowsCountOnly(pReader); } @@ -5176,7 +5154,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { *hasNext = false; - if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) { + if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) { return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code; } @@ -5216,7 +5194,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { // prepare for the main scan code = doOpenReaderImpl(pReader); int32_t step = 1; - resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey, step); + resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->info.window.ekey, step); if (code != TSDB_CODE_SUCCESS) { return code; @@ -5244,7 +5222,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { // prepare for the next row scan int32_t step = -1; code = doOpenReaderImpl(pReader->innerReader[1]); - resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey, step); + resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->info.window.ekey, step); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5476,7 +5454,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { } } - if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) { + if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->pReadSnap == NULL) { tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr); tsdbReleaseReader(pReader); return TSDB_CODE_SUCCESS; @@ -5485,11 +5463,11 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - pReader->order = pCond->order; + pReader->info.order = pCond->order; pReader->type = TIMEWINDOW_RANGE_CONTAINED; pStatus->loadFromFile = true; pStatus->pTableIter = NULL; - pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); + pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); // allocate buffer in order to load data blocks from file memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); @@ -5499,13 +5477,13 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); - initFilesetIterator(&pStatus->fileIter, pReader->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->order); + initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader); + resetDataBlockIterator(pBlockIter, pReader->info.order); resetTableListIndex(&pReader->status); - bool asc = ASCENDING_TRAVERSE(pReader->order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; - int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1; + int64_t ts = asc ? pReader->info.window.skey - 1 : pReader->info.window.ekey + 1; resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); // no data in files, let's try buffer in memory @@ -5516,7 +5494,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { code = initForFirstBlockInFile(pReader, pBlockIter); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, - numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); tsdbReleaseReader(pReader); return code; @@ -5525,7 +5503,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64 " in query %s", - pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, + pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); tsdbReleaseReader(pReader); @@ -5648,7 +5626,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { STbData* d = NULL; if (pReader->pReadSnap->pMem != NULL) { - d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); + d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->info.suid, pBlockScanInfo->uid); if (d != NULL) { rows += tsdbGetNRowsInTbData(d); } @@ -5656,7 +5634,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { STbData* di = NULL; if (pReader->pReadSnap->pIMem != NULL) { - di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid); + di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->info.suid, pBlockScanInfo->uid); if (di != NULL) { rows += tsdbGetNRowsInTbData(di); } @@ -5710,7 +5688,7 @@ int32_t tsdbGetTableSchema2(void* pVnode, int64_t uid, STSchema** pSchema, int64 int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; - SVersionRange* pRange = &pReader->verRange; + SVersionRange* pRange = &pReader->info.verRange; // lock taosThreadRwlockRdlock(&pTsdb->rwLock); @@ -5752,7 +5730,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs } // fs - code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pReader->pfSetArray); + code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pReader->status.pfSetArray); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); goto _exit; @@ -5795,7 +5773,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact if (pSnap->pINode) taosMemoryFree(pSnap->pINode); taosMemoryFree(pSnap); - tsdbFSDestroyRefSnapshot(&pReader->pfSetArray); + tsdbFSDestroyRefSnapshot(&pReader->status.pfSetArray); } tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode)); }