diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 9d4b8b17f7f50447f9e047cd3001e83f21e0e2d5..756a206af0f4303207522120fe788c84bd598f8f 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -460,7 +460,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target); int tsdbLoadCompInfo(SRWHelper *pHelper, void *target); int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target); int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds); -int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target); +int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target); // --------- For write operations int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 9a422419000d8094fc466a1d70896b1ee51fffca..da3c55bbf9e68b20bb18b1379dc3905deff8c6f6 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -363,7 +363,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ASSERT(pCompBlock->last); if (pCompBlock->numOfSubBlocks > 1) { - if (tsdbLoadBlockData(pHelper, pIdx->numOfSuperBlocks - 1, NULL) < 0) return -1; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfSuperBlocks - 1), NULL) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 && pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], @@ -607,8 +607,8 @@ _err: } // Load the whole block data -int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target) { - SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; +int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) { + // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; int numOfSubBlock = pCompBlock->numOfSubBlocks; if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset); @@ -797,7 +797,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load - if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints); // Merge if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; @@ -852,7 +852,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load-Merge-Write // Load - if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; rowsWritten = rows3; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 6b3b7e1e4e3d62b5e6807cc012c3c0c73ea14fec..5ede59973759b9ecdf2b46299ed33085c0049675 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -128,6 +128,7 @@ typedef struct STsdbQueryHandle { SFileGroup* pFileGroup; SFileGroupIter fileIter; SCompIdx* compIndex; + SRWHelper rhelper; } STsdbQueryHandle; static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { @@ -150,7 +151,8 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->order = pCond->order; pQueryHandle->window = pCond->twindow; pQueryHandle->pTsdb = tsdb; - pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)), + pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)); + tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); pQueryHandle->loadDataAfterSeek = false; pQueryHandle->isFirstSlot = true; @@ -299,11 +301,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo SFileGroup* fileGroup = pQueryHandle->pFileGroup; assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); - if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) { - fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY); - } else { - assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd)); - } + tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup); // load all the comp offset value for all tables in this file // tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables @@ -314,7 +312,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo for (int32_t i = 0; i < numOfTables; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid]; + SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file continue;//no data blocks in the file belongs to pCheckInfo->pTable } else { @@ -329,7 +327,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo } // tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo); - + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->tableId.uid); + assert(pTable != NULL); + + tsdbSetHelperTable(&pQueryHandle->rhelper, pTable, pQueryHandle->pTsdb); + + tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); SCompInfo* pCompInfo = pCheckInfo->pCompInfo; TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); @@ -420,26 +423,26 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj)); - SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA]; - if (pFile->fd == FD_INITIALIZER) { - pFile->fd = open(pFile->fname, O_RDONLY); - } + // SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA]; + // if (pFile->fd == FD_INITIALIZER) { + // pFile->fd = open(pFile->fname, O_RDONLY); + // } - // if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { - // SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; + if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) { + SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; - // pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; - // pBlockLoadInfo->slot = pQueryHandle->cur.slot; - // pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid; + pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; + pBlockLoadInfo->slot = pQueryHandle->cur.slot; + pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid; - // blockLoaded = true; - // } + blockLoaded = true; + } taosArrayDestroy(sa); tfree(data); - TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData; - assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast); + // TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData; + // assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast); return blockLoaded; } @@ -595,7 +598,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf } } - int32_t start = MIN(cur->pos, endPos); + // int32_t start = MIN(cur->pos, endPos); // move the data block in the front to data block if needed int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); @@ -607,9 +610,10 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j); if (pCol->info.colId == colId) { - SDataCol* pDataCol = &pCols->cols[i]; - memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start, - pQueryHandle->realNumOfRows * pCol->info.bytes); + // SDataCol* pDataCol = &pCols->cols[i]; + pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData; + // memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start, + // pQueryHandle->realNumOfRows * pCol->info.bytes); break; } } @@ -1530,7 +1534,7 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { size_t cols = taosArrayGetSize(pQueryHandle->pColumns); for (int32_t i = 0; i < cols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - tfree(pColInfo->pData); + // tfree(pColInfo->pData); } taosArrayDestroy(pQueryHandle->pColumns); diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index a7d94f236249a8cef2b396bbaae0251c8cf9f649..85fca7d94f3422a8eb57321f9e7379c0f397f86c 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -202,7 +202,7 @@ TEST(TsdbTest, createRepo) { tsdbSetHelperTable(&rhelper, pTable, repo); ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0); - ASSERT_EQ(tsdbLoadBlockData(&rhelper, 0, NULL), 0); + ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0); int k = 0; }