diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 67a88d0e9f53dbe5010523f33091d15b285ee5c0..062e91a6a63412392f10037fa2a984b69d4b2028 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -363,12 +363,11 @@ static void resetDataBlockIterator(SDataBlockIter* pIter) { } static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { + pIter->index += 1; if (pIter->index >= pIter->numOfFiles) { return false; } - pIter->index += 1; - // check file the time range of coverage STimeWindow win = {0}; pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index); @@ -920,7 +919,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); - ASSERT(pScanInfo->pBlockList == NULL || taosArrayGetSize(pScanInfo->pBlockList) == 0); + taosArrayClear(pScanInfo->pBlockList); + for (int32_t j = 0; j < mapData.nItem; ++j) { SBlock block = {0}; @@ -971,18 +971,25 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); uint8_t *pb = NULL, *pb1 = NULL; - int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, /*pReader->suppInfo.colIdList, numOfCols, */&pb, &pb1); + int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1); if (code != TSDB_CODE_SUCCESS) { goto _error; } - SColVal cv = {0}; for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) { - SColData* pData = (SColData*)taosArrayGet(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]); SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i); - for(int32_t j = 0; j < pBlockData->nRow; ++j) { - tColDataGetValue(pData, j, &cv); - colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); + if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + for (int32_t j = 0; j < pBlockData->nRow; ++j) { + colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false); + } + } else { + SColVal cv = {0}; + + SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]); + for (int32_t j = 0; j < pBlockData->nRow; ++j) { + tColDataGetValue(pData, j, &cv); + colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); + } } } @@ -2276,7 +2283,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) } static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { - return (key.ts >= pBlock->minKey.ts || key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; + return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; } static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { @@ -2511,7 +2518,6 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); initMemIterator(pScanInfo, pReader); if (pScanInfo->memHasVal) { @@ -2563,7 +2569,82 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { return TSDB_CODE_SUCCESS; } -static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { +static int32_t doBuildDataBlock(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + + SReaderStatus* pStatus = &pReader->status; + SDataBlockIter* pBlockIter = &pStatus->blockIter; + + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + + SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + + TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); + if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { + SBlockData data = {0}; + tBlockDataInit(&data); + + code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // build composed data block + code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { + // data in memory that are earlier than current file block + TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + code = buildInmemDataBlock(pReader, pScanInfo, &maxKey); + // build data block from in-memory buffer data completed. + } else { // whole block is required, return it directly + // todo + // 1. the version of all rows should be less than the endVersion + // 2. current block should not overlap with next neighbor block + // 3. current timestamp should not be overlap with each other + SDataBlockInfo* pInfo = &pReader->pResBlock->info; + pInfo->rows = pBlock->nRow; + pInfo->uid = pScanInfo->uid; + pInfo->window.skey = pBlock->minKey.ts; + pInfo->window.ekey = pBlock->maxKey.ts; + setComposedBlockFlag(pReader, false); + } + + return code; +} + +static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + + while(1) { + if (pStatus->pTableIter == NULL) { + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); + if (pStatus->pTableIter == NULL) { + return false; + } + } + + STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; + initMemIterator(pBlockScanInfo, pReader); + + TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; + buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); + if (pReader->pResBlock->info.rows > 0) { + return true; + } + + // current table is exhausted, let's try the next table + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); + if (pStatus->pTableIter == NULL) { + return false; + } + } +} + +static int32_t loadDataInFiles(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SFileSetIter* pFIter = &pStatus->fileIter; @@ -2583,36 +2664,8 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { return code; } - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); - - TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); - - if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { - SBlockData data = {0}; - doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data); + code = doBuildDataBlock(pReader); - // build composed data block - buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); - } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { - // data in memory that are earlier than current file block - TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - buildInmemDataBlock(pReader, pScanInfo, &maxKey); - // build data block from in-memory buffer data completed. - } else { // whole block is required, return it directly - // todo - // 1. the version of all rows should be less than the endVersion - // 2. current block should not overlap with next neighbor block - // 3. current timestamp should not be overlap with each other - SDataBlockInfo* pInfo = &pReader->pResBlock->info; - pInfo->rows = pBlock->nRow; - pInfo->uid = pScanInfo->uid; - pInfo->window.skey = pBlock->minKey.ts; - pInfo->window.ekey = pBlock->maxKey.ts; - setComposedBlockFlag(pReader, false); - } - *exists = true; } else { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); @@ -2629,46 +2682,22 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { return code; } - // initialize the block iterator for a new fileset - code = initBlockIterator(pReader, pBlockIter, numOfBlocks); - if (code != TSDB_CODE_SUCCESS) { + // all data files are consumed, try data in buffer + if (numOfBlocks == 0) { + pReader->status.loadFromFile = false; return code; + } else { + // initialize the block iterator for a new fileset + code = initBlockIterator(pReader, pBlockIter, numOfBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doBuildDataBlock(pReader); } - - pFBlock = getCurrentBlockInfo(pBlockIter); - pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); - - TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); - if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { - SBlockData data = {0}; - doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data); - // build composed data block - buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); - } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { - // data in memory that are earlier than current file block - TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - buildInmemDataBlock(pReader, pScanInfo, &maxKey); - // build data block from in-memory buffer data completed. - } else { // whole block is required, return it directly - // todo - // 1. the version of all rows should be less than the endVersion - // 2. current block should not overlap with next neighbor block - // 3. current timestamp should not be overlap with each other - SDataBlockInfo* pInfo = &pReader->pResBlock->info; - pInfo->rows = pBlock->nRow; - pInfo->uid = pScanInfo->uid; - pInfo->window.skey = pBlock->minKey.ts; - pInfo->window.ekey = pBlock->maxKey.ts; - setComposedBlockFlag(pReader, false); - } - *exists = true; - } else { // try next data block in current file - // 1. check if ts in buffer is overlap with current file data block blockIteratorNext(pBlockIter); - - + doBuildDataBlock(pReader); } } else { buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); @@ -3263,13 +3292,13 @@ void tsdbReaderClose(STsdbReader* pReader) { } bool tsdbNextDataBlock(STsdbReader* pReader) { - bool ret = false; if (isEmptyQueryTimeWindow(pReader)) { return false; } // cleanup the data that belongs to the previous data block - blockDataCleanup(pReader->pResBlock); + SSDataBlock* pBlock = pReader->pResBlock; + blockDataCleanup(pBlock); int64_t stime = taosGetTimestampUs(); int64_t elapsedTime = stime; @@ -3277,41 +3306,20 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { if (pStatus->loadFromFile) { - bool exists = true; - int32_t code = loadDataInFiles(pReader, &exists); + int32_t code = loadDataInFiles(pReader); if (code != TSDB_CODE_SUCCESS) { return false; } - if (exists) { + if (pBlock->info.rows > 0) { return true; - } else { // let's try the in-memory buffer - + } else { + buildInmemBlockSeqentially(pReader); + return pBlock->info.rows > 0; } } else { // no data in files, let's try the buffer - while(1) { - if (pStatus->pTableIter == NULL) { - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); - if (pStatus->pTableIter == NULL) { - return false; - } - } - - STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; - initMemIterator(pBlockScanInfo, pReader); - - TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; - buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); - if (pReader->pResBlock->info.rows > 0) { - return true; - } - - // current table is exhausted, let's try the next table - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); - if (pStatus->pTableIter == NULL) { - return false; - } - } + buildInmemBlockSeqentially(pReader); + return pBlock->info.rows > 0; } } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { @@ -3351,7 +3359,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { // pReader->cost.checkForNextTime += elapsedTime; // return ret; // } - return ret; + return false; } void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { @@ -3450,40 +3458,6 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { return pReader->pResBlock->pDataBlock; } - // /** - // * In the following two cases, the data has been loaded to SColumnInfoData. - // * 1. data is from cache, 2. data block is not completed qualified to query time range - // */ - // if (pReader->cur.fid == INT32_MIN) { - // return pReader->pColumns; - // } else { - // SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[pReader->cur.slot]; - // STableBlockScanInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; - - // if (pReader->cur.mixBlock) { - // return pReader->pColumns; - // } else { - // SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock); - // assert(pReader->realNumOfRows <= binfo.rows); - - // // data block has been loaded, todo extract method - // SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo; - - // if (pBlockLoadInfo->slot == pReader->cur.slot && pBlockLoadInfo->fileGroup->fid == pReader->cur.fid && - // pBlockLoadInfo->uid == pCheckInfo->tableId) { - // return pReader->pColumns; - // } else { // only load the file block - // SBlock* pBlock = pBlockInfo->compBlock; - // if (doLoadFileBlockData(pReader, pBlock, pCheckInfo, pReader->cur.slot) != TSDB_CODE_SUCCESS) { - // return NULL; - // } - - // int32_t numOfRows = doCopyRowsFromFileBlock(pReader, pReader->outputCapacity, 0, 0, pBlock->numOfRows - 1); - // return pReader->pColumns; - // } - // } - // } - return NULL; } void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {