diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3c8d394b4396776b03d3557e49aa7d87c29ffccc..a75046d06d04fc229244031e80c0e28bd72e7dd2 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1180,7 +1180,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { void blockDataEmpty(SSDataBlock* pDataBlock) { SDataBlockInfo* pInfo = &pDataBlock->info; - if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) { + if (pInfo->capacity == 0) { return; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 96bce02b674476b760114099de9d93eb0a735d72..c6444b5ded690619919261cc228cc9bb4850d787 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -24,6 +24,11 @@ typedef enum { EXTERNAL_ROWS_NEXT = 0x3, } EContentData; +typedef enum { + READ_MODE_COUNT_ONLY = 0x1, + READ_MODE_ALL, +} EReadMode; + typedef struct { STbDataIter* iter; int32_t index; @@ -167,6 +172,8 @@ struct STsdbReader { uint64_t suid; int16_t order; bool freeBlock; + EReadMode readMode; + uint64_t rowsNum; STimeWindow window; // the primary query time window that applies to all queries SSDataBlock* pResBlock; int32_t capacity; @@ -2998,6 +3005,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { + if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) { + return code; + } code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3006,12 +3016,19 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // build composed data block code = buildComposedDataBlock(pReader); } else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) { + if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) { + return code; + } // data in memory that are earlier than current file block // rows in buffer should be less than the file block in asc, greater than file block in desc int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) { + if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) { + return code; + } + // only return the rows in last block int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); ASSERT(tsLast >= pBlock->maxKey.ts); @@ -3069,6 +3086,131 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } + +static int32_t doSumBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { + int64_t st = taosGetTimestampUs(); + LRUHandle* handle = NULL; + int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle); + if (code != TSDB_CODE_SUCCESS || handle == NULL) { + goto _end; + } + + int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + + SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle); + size_t num = taosArrayGetSize(aBlockIdx); + if (num == 0) { + tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); + return TSDB_CODE_SUCCESS; + } + + SBlockIdx* pBlockIdx = NULL; + int32_t i = 0; + for (int32_t i = 0; i < num; ++i) { + pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); + if (pBlockIdx->suid != pReader->suid) { + continue; + } + + STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid)); + if (p == NULL || *p == NULL) { + continue; + } + + STableBlockScanInfo *pScanInfo = *p; + tMapDataReset(&pScanInfo->mapData); + tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); + + SDataBlk block = {0}; + for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { + tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); + pReader->rowsNum += block.nRow; + } + } + +_end: + tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); + return code; +} + + +static int32_t readRowsCountFromFile(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + + while (1) { + bool hasNext = false; + int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext); + if (code) { + return code; + } + + if (!hasNext) { // no data files on disk + break; + } + + code = doSumBlockRows(pReader, pReader->pFileReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + pReader->status.loadFromFile = false; + + return code; +} + +static int32_t readRowsCountFromStt(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + SSttBlockLoadInfo* pBlockLoadInfo = NULL; + + for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file + pBlockLoadInfo = &pLastBlockReader->pInfo[i]; + + if (!pLastBlockReader->pInfo[i].sttBlockLoaded) { + pLastBlockReader->pInfo[i].sttBlockLoaded = true; + + code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk); + if (code) { + return code; + } + } + + size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); + + if (size >= 1) { + SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0); + SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1); + + // all identical + if (pStart->suid == pEnd->suid) { + if (pStart->suid != pReader->suid) { + // no qualified stt block existed + taosArrayClear(pBlockLoadInfo->aSttBlk); + continue; + } + } else { + for (int32_t i = 0; i < size; ++i) { + SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); + uint64_t s = p->suid; + if (s < pReader->suid) { + continue; + } + + if (s == pReader->suid) { + pReader->rowsNum += p->nRow; + } else if (s > pReader->suid) { + break; + } + } + } + } + } + + return code; +} + + static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -3212,6 +3354,12 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { initBlockDumpInfo(pReader, pBlockIter); } else { if (pReader->status.pCurrentFileset->nSttF > 0) { + if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) { + pReader->pResBlock->info.rows = pReader->rowsNum; + pReader->rowsNum = 0; + return TSDB_CODE_SUCCESS; + } + // data blocks in current file are exhausted, let's try the next file now SBlockData* pBlockData = &pReader->status.fileBlockData; if (pBlockData->uid != 0) { @@ -3226,7 +3374,17 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = initForFirstBlockInFile(pReader, pBlockIter); // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (READ_MODE_COUNT_ONLY == pReader->readMode && pReader->rowsNum > 0) { + pReader->pResBlock->info.rows = pReader->rowsNum; + pReader->rowsNum = 0; + return TSDB_CODE_SUCCESS; + } + + if (pReader->status.loadFromFile == false) { return code; } @@ -3240,6 +3398,17 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } code = doBuildDataBlock(pReader); + if (READ_MODE_COUNT_ONLY == pReader->readMode) { + if (false == pReader->status.composedDataBlock && pDumpInfo->allDumped) { + pReader->rowsNum += pReader->pResBlock->info.rows; + pReader->pResBlock->info.rows = 0; + continue; + } else if (pReader->pResBlock->info.rows == 0 && pReader->rowsNum > 0) { + pReader->pResBlock->info.rows = pReader->rowsNum; + pReader->rowsNum = 0; + return TSDB_CODE_SUCCESS; + } + } } if (code != TSDB_CODE_SUCCESS) { @@ -3986,6 +4155,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { pStatus->loadFromFile = false; + } else if (READ_MODE_COUNT_ONLY == pReader->readMode) { + // DO NOTHING } else { code = initForFirstBlockInFile(pReader, pBlockIter); } @@ -4091,6 +4262,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL pReader->suspended = true; + + pReader->readMode = READ_MODE_COUNT_ONLY; + tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -4394,6 +4568,35 @@ _err: return code; } +static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->pResBlock; + + while (1) { + if (pReader->status.loadFromFile == false) { + break; + } + + code = readRowsCountFromFile(pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + + code = readRowsCountFromStt(pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + } + + pBlock->info.rows = pReader->rowsNum; + pBlock->info.id.uid = 0; + pBlock->info.dataLoad = 0; + + pReader->rowsNum = 0; + + return pBlock->info.rows > 0; +} + static bool doTsdbNextDataBlock(STsdbReader* pReader) { // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; @@ -4404,6 +4607,10 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) { return false; } + if (READ_MODE_COUNT_ONLY == pReader->readMode) { + return tsdbReadRowsCountOnly(pReader); + } + if (pStatus->loadFromFile) { int32_t code = buildBlockFromFiles(pReader); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5dff1abb9799f43eddd14847cbca26bc1936cc78..2af13f083d0b5e297b44d3bef9de7a43027281b7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -654,9 +654,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } - ASSERT(pBlock->info.id.uid != 0); - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); - + if (pBlock->info.id.uid) { + pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + } + uint32_t status = 0; int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { @@ -680,7 +681,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid; pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; - ASSERT(pBlock->info.id.uid != 0); return pBlock; } return NULL; @@ -797,7 +797,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { - ASSERT(result->info.id.uid != 0); return result; }