diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a797d6981ceeb587514ae67b14a03fd35275e6e9..9a43a5d6d2e92300c22f554e4a0623a3dbc13b80 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -118,12 +118,12 @@ typedef struct STsdbReader STsdbReader; int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, uint64_t taskId, STsdbReader **ppReader); +void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx); -void tsdbCleanupReadHandle(STsdbReader *pReader); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 644dd21e6ed908b5e122032acf15cbf0b7646fca..abb693c0bf3e1decfb2c1fff938f75b9370f01da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -180,29 +180,6 @@ static SArray* getDefaultLoadColumns(STsdbReader* pTsdbReadHandle, bool loadTS) return pLocalIdList; } -int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { - int64_t rows = 0; - SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; - if (pMemTable == NULL) { - return rows; - } - - size_t size = taosArrayGetSize(pReader->pTableCheckInfo); - for (int32_t i = 0; i < size; ++i) { - STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i); - - // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { - // pMem = pMemT->tData[pCheckInfo->tableId]; - // rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; - // } - // if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { - // pIMem = pIMemT->tData[pCheckInfo->tableId]; - // rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; - // } - } - return rows; -} - static SArray* createCheckInfoFromTableGroup(STsdbReader* pTsdbReadHandle, STableListInfo* pTableList) { size_t tableSize = taosArrayGetSize(pTableList->pTableList); assert(tableSize >= 1); @@ -441,7 +418,7 @@ static STsdbReader* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCo return (STsdbReader*)pReadHandle; _end: - tsdbCleanupReadHandle(pReadHandle); + tsdbReaderClose(pReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } @@ -479,42 +456,6 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) { return TSDB_CODE_SUCCESS; } -void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { - if (emptyQueryTimewindow(pReader)) { - if (pCond->order != pReader->order) { - pReader->order = pCond->order; - TSWAP(pReader->window.skey, pReader->window.ekey); - } - - return; - } - - pReader->order = pCond->order; - setQueryTimewindow(pReader, pCond, tWinIdx); - pReader->type = TSDB_QUERY_TYPE_ALL; - pReader->cur.fid = -1; - pReader->cur.win = TSWINDOW_INITIALIZER; - pReader->checkFiles = true; - pReader->activeIndex = 0; // current active table index - pReader->locateStart = false; - pReader->loadExternalRow = pCond->loadExternalRows; - - if (ASCENDING_TRAVERSE(pCond->order)) { - assert(pReader->window.skey <= pReader->window.ekey); - } else { - assert(pReader->window.skey >= pReader->window.ekey); - } - - // allocate buffer in order to load data blocks from file - memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); - memset(pReader->suppInfo.plist, 0, POINTER_BYTES); - - tsdbInitDataBlockLoadInfo(&pReader->dataBlockLoadInfo); - tsdbInitCompBlockLoadInfo(&pReader->compBlockLoadInfo); - - resetCheckInfo(pReader); -} - void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, int32_t tWinIdx) { STsdbReader* pTsdbReadHandle = queryHandle; @@ -550,7 +491,7 @@ void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCo pTsdbReadHandle->pTableCheckInfo = NULL; // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, // &pTable); if (pTsdbReadHandle->pTableCheckInfo == NULL) { - // tsdbCleanupReadHandle(pTsdbReadHandle); + // tsdbReaderClose(pTsdbReadHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; } @@ -2477,116 +2418,6 @@ static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t num return (numOfRows - startRow) / bucketRange; } -int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { - pTableBlockInfo->totalSize = 0; - pTableBlockInfo->totalRows = 0; - - STsdbFS* pFileHandle = REPO_FS(pReader->pTsdb); - - // find the start data block in file - pReader->locateStart = true; - STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pReader->pTsdb); - int32_t fid = getFileIdFromKey(pReader->window.skey, pCfg->days, pCfg->precision); - - tsdbRLockFS(pFileHandle); - tsdbFSIterInit(&pReader->fileIter, pFileHandle, pReader->order); - tsdbFSIterSeek(&pReader->fileIter, fid); - tsdbUnLockFS(pFileHandle); - - STsdbCfg* pc = REPO_CFG(pReader->pTsdb); - pTableBlockInfo->defMinRows = pc->minRows; - pTableBlockInfo->defMaxRows = pc->maxRows; - - int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0); - - pTableBlockInfo->numOfFiles += 1; - - int32_t code = TSDB_CODE_SUCCESS; - int32_t numOfBlocks = 0; - int32_t numOfTables = (int32_t)taosArrayGetSize(pReader->pTableCheckInfo); - int defaultRows = 4096; - STimeWindow win = TSWINDOW_INITIALIZER; - - while (true) { - numOfBlocks = 0; - tsdbRLockFS(REPO_FS(pReader->pTsdb)); - - if ((pReader->pFileGroup = tsdbFSIterNext(&pReader->fileIter)) == NULL) { - tsdbUnLockFS(REPO_FS(pReader->pTsdb)); - break; - } - - tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pReader->pFileGroup->fid, &win.skey, &win.ekey); - - // current file are not overlapped with query time window, ignore remain files - if ((win.skey > pReader->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) { - tsdbUnLockFS(REPO_FS(pReader->pTsdb)); - tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, - pReader->window.skey, pReader->window.ekey, pReader->idStr); - pReader->pFileGroup = NULL; - break; - } - - pTableBlockInfo->numOfFiles += 1; - if (tsdbSetAndOpenReadFSet(&pReader->rhelper, pReader->pFileGroup) < 0) { - tsdbUnLockFS(REPO_FS(pReader->pTsdb)); - code = terrno; - break; - } - - tsdbUnLockFS(REPO_FS(pReader->pTsdb)); - - if (tsdbLoadBlockIdx(&pReader->rhelper) < 0) { - code = terrno; - break; - } - - if ((code = getFileCompInfo(pReader, &numOfBlocks)) != TSDB_CODE_SUCCESS) { - break; - } - - tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, - pReader->pFileGroup->fid, pReader->idStr); - - if (numOfBlocks == 0) { - continue; - } - - pTableBlockInfo->numOfBlocks += numOfBlocks; - - for (int32_t i = 0; i < numOfTables; ++i) { - STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i); - - SBlock* pBlock = pCheckInfo->pCompInfo->blocks; - - for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) { - pTableBlockInfo->totalSize += pBlock[j].len; - - int32_t numOfRows = pBlock[j].numOfRows; - pTableBlockInfo->totalRows += numOfRows; - - if (numOfRows > pTableBlockInfo->maxRows) { - pTableBlockInfo->maxRows = numOfRows; - } - - if (numOfRows < pTableBlockInfo->minRows) { - pTableBlockInfo->minRows = numOfRows; - } - - if (numOfRows < defaultRows) { - pTableBlockInfo->numOfSmallBlocks += 1; - } - - int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows); - pTableBlockInfo->blockRowsHisto[bucketIndex]++; - } - } - } - - pTableBlockInfo->numOfTables = numOfTables; - return code; -} - static int32_t getDataBlocksInFiles(STsdbReader* pTsdbReadHandle, bool* exists) { STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb); SQueryFilePos* cur = &pTsdbReadHandle->cur; @@ -3072,63 +2903,6 @@ static bool loadDataBlockFromTableSeq(STsdbReader* pTsdbReadHandle) { // handle data in cache situation // bool tsdbNextDataBlock(STsdbReader * pHandle, uint64_t uid) -bool tsdbNextDataBlock(STsdbReader* pReader) { - size_t numOfCols = taosArrayGetSize(pReader->pColumns); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pReader->pColumns, i); - colInfoDataCleanup(pColInfo, pReader->outputCapacity); - } - - if (emptyQueryTimewindow(pReader)) { - tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); - return false; - } - - int64_t stime = taosGetTimestampUs(); - int64_t elapsedTime = stime; - - // TODO refactor: remove "type" - if (pReader->type == TSDB_QUERY_TYPE_LAST) { - if (pReader->cachelastrow == TSDB_CACHED_TYPE_LASTROW) { - // return loadCachedLastRow(pTsdbReadHandle); - } else if (pReader->cachelastrow == TSDB_CACHED_TYPE_LAST) { - // return loadCachedLast(pTsdbReadHandle); - } - } - - if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { - return loadDataBlockFromTableSeq(pReader); - } else { // loadType == RR and Offset Order - if (pReader->checkFiles) { - // check if the query range overlaps with the file data block - bool exists = true; - - int32_t code = getDataBlocksInFiles(pReader, &exists); - if (code != TSDB_CODE_SUCCESS) { - pReader->activeIndex = 0; - pReader->checkFiles = false; - - return false; - } - - if (exists) { - pReader->cost.checkForNextTime += (taosGetTimestampUs() - stime); - return exists; - } - - pReader->activeIndex = 0; - pReader->checkFiles = false; - } - - // TODO: opt by consider the scan order - bool ret = doHasDataInBuffer(pReader); - terrno = TSDB_CODE_SUCCESS; - - elapsedTime = taosGetTimestampUs() - stime; - pReader->cost.checkForNextTime += elapsedTime; - return ret; - } -} // static int32_t doGetExternalRow(STsdbReader* pTsdbReadHandle, int16_t type, SMemTable* pMemRef) { // STsdbReader* pSecQueryHandle = NULL; @@ -3236,7 +3010,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { // } // // out_of_memory: -// tsdbCleanupReadHandle(pSecQueryHandle); +// tsdbReaderClose(pSecQueryHandle); // return terrno; //} @@ -3410,26 +3184,224 @@ STimeWindow updateLastrowForEachGroup(STableListInfo* pList) { return window; } -void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { - SQueryFilePos* cur = &pReader->cur; - - uint64_t uid = 0; +/* + * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL + */ - // there are data in file - if (pReader->cur.fid != INT32_MIN) { - STableBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[cur->slot]; - uid = pBlockInfo->pTableCheckInfo->tableId; +static int tsdbCheckInfoCompar(const void* key1, const void* key2) { + if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) { + return -1; + } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) { + return 1; } else { - STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, pReader->activeIndex); - uid = pCheckInfo->tableId; + ASSERT(false); + return 0; } +} - tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows, - cur->win.skey, cur->win.ekey, pReader->idStr); - - pDataBlockInfo->uid = uid; +static void* doFreeColumnInfoData(SArray* pColumnInfoData) { + if (pColumnInfoData == NULL) { + return NULL; + } -#if 0 + size_t cols = taosArrayGetSize(pColumnInfoData); + for (int32_t i = 0; i < cols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i); + colDataDestroy(pColInfo); + } + + taosArrayDestroy(pColumnInfoData); + return NULL; +} + +static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { + size_t size = taosArrayGetSize(pTableCheckInfo); + for (int32_t i = 0; i < size; ++i) { + STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i); + destroyTableMemIterator(p); + + taosMemoryFreeClear(p->pCompInfo); + } + + taosArrayDestroy(pTableCheckInfo); + return NULL; +} + +// ====================================== EXPOSED APIs ====================================== +int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, + uint64_t taskId, STsdbReader** ppReader) { + int32_t code = 0; + + STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); + if (pReader == NULL) { + return NULL; + } + + if (emptyQueryTimewindow(pReader)) { + return (STsdbReader*)pReader; + } + + // todo apply the lastkey of table check to avoid to load header file + pReader->pTableCheckInfo = createCheckInfoFromTableGroup(pReader, tableList); + if (pReader->pTableCheckInfo == NULL) { + // tsdbReaderClose(pTsdbReadHandle); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + int32_t code = setCurrentSchema(pVnode, pReader); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn); + int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData; + + STSchema* pSchema = pReader->pSchema; + + int32_t i = 0, j = 0; + while (i < numOfCols && j < pSchema->numOfCols) { + if (ids[i] == pSchema->columns[j].colId) { + pReader->suppInfo.slotIds[i] = j; + i++; + j++; + } else if (ids[i] > pSchema->columns[j].colId) { + j++; + } else { + // tsdbReaderClose(pTsdbReadHandle); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + } + + tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pReader, + taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr); + + return (STsdbReader*)pReader; +} + +void tsdbReaderClose(STsdbReader* pReader) { + if (pReader == NULL) { + return; + } + + pReader->pColumns = doFreeColumnInfoData(pReader->pColumns); + + taosArrayDestroy(pReader->suppInfo.defaultLoadColumn); + taosMemoryFreeClear(pReader->pDataBlockInfo); + taosMemoryFreeClear(pReader->suppInfo.pstatis); + taosMemoryFreeClear(pReader->suppInfo.plist); + taosMemoryFree(pReader->suppInfo.slotIds); + + if (!emptyQueryTimewindow(pReader)) { + // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); + } else { + assert(pReader->pTableCheckInfo == NULL); + } + + if (pReader->pTableCheckInfo != NULL) { + pReader->pTableCheckInfo = destroyTableCheckInfo(pReader->pTableCheckInfo); + } + + tsdbDestroyReadH(&pReader->rhelper); + + tdFreeDataCols(pReader->pDataCols); + pReader->pDataCols = NULL; + + pReader->prev = doFreeColumnInfoData(pReader->prev); + pReader->next = doFreeColumnInfoData(pReader->next); + + SIOCostSummary* pCost = &pReader->cost; + + tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64 + " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s", + pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, + pCost->checkForNextTime, pReader->idStr); + + taosMemoryFree(pReader->idStr); + taosMemoryFree(pReader->pSchema); + taosMemoryFreeClear(pReader); +} + +bool tsdbNextDataBlock(STsdbReader* pReader) { + size_t numOfCols = taosArrayGetSize(pReader->pColumns); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pReader->pColumns, i); + colInfoDataCleanup(pColInfo, pReader->outputCapacity); + } + + if (emptyQueryTimewindow(pReader)) { + tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); + return false; + } + + int64_t stime = taosGetTimestampUs(); + int64_t elapsedTime = stime; + + // TODO refactor: remove "type" + if (pReader->type == TSDB_QUERY_TYPE_LAST) { + if (pReader->cachelastrow == TSDB_CACHED_TYPE_LASTROW) { + // return loadCachedLastRow(pTsdbReadHandle); + } else if (pReader->cachelastrow == TSDB_CACHED_TYPE_LAST) { + // return loadCachedLast(pTsdbReadHandle); + } + } + + if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) { + return loadDataBlockFromTableSeq(pReader); + } else { // loadType == RR and Offset Order + if (pReader->checkFiles) { + // check if the query range overlaps with the file data block + bool exists = true; + + int32_t code = getDataBlocksInFiles(pReader, &exists); + if (code != TSDB_CODE_SUCCESS) { + pReader->activeIndex = 0; + pReader->checkFiles = false; + + return false; + } + + if (exists) { + pReader->cost.checkForNextTime += (taosGetTimestampUs() - stime); + return exists; + } + + pReader->activeIndex = 0; + pReader->checkFiles = false; + } + + // TODO: opt by consider the scan order + bool ret = doHasDataInBuffer(pReader); + terrno = TSDB_CODE_SUCCESS; + + elapsedTime = taosGetTimestampUs() - stime; + pReader->cost.checkForNextTime += elapsedTime; + return ret; + } +} + +void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { + SQueryFilePos* cur = &pReader->cur; + + uint64_t uid = 0; + + // there are data in file + if (pReader->cur.fid != INT32_MIN) { + STableBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[cur->slot]; + uid = pBlockInfo->pTableCheckInfo->tableId; + } else { + STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, pReader->activeIndex); + uid = pCheckInfo->tableId; + } + + tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows, + cur->win.skey, cur->win.ekey, pReader->idStr); + + pDataBlockInfo->uid = uid; + +#if 0 // for multi-group data query processing test purpose pDataBlockInfo->groupId = uid; #endif @@ -3438,9 +3410,6 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI pDataBlockInfo->window = cur->win; } -/* - * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL - */ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { *allHave = false; @@ -3551,137 +3520,171 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { } } -static int tsdbCheckInfoCompar(const void* key1, const void* key2) { - if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) { - return -1; - } else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) { - return 1; - } else { - ASSERT(false); - return 0; - } -} +void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { + if (emptyQueryTimewindow(pReader)) { + if (pCond->order != pReader->order) { + pReader->order = pCond->order; + TSWAP(pReader->window.skey, pReader->window.ekey); + } -static void* doFreeColumnInfoData(SArray* pColumnInfoData) { - if (pColumnInfoData == NULL) { - return NULL; + return; } - size_t cols = taosArrayGetSize(pColumnInfoData); - for (int32_t i = 0; i < cols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i); - colDataDestroy(pColInfo); + pReader->order = pCond->order; + setQueryTimewindow(pReader, pCond, tWinIdx); + pReader->type = TSDB_QUERY_TYPE_ALL; + pReader->cur.fid = -1; + pReader->cur.win = TSWINDOW_INITIALIZER; + pReader->checkFiles = true; + pReader->activeIndex = 0; // current active table index + pReader->locateStart = false; + pReader->loadExternalRow = pCond->loadExternalRows; + + if (ASCENDING_TRAVERSE(pCond->order)) { + assert(pReader->window.skey <= pReader->window.ekey); + } else { + assert(pReader->window.skey >= pReader->window.ekey); } - taosArrayDestroy(pColumnInfoData); - return NULL; + // allocate buffer in order to load data blocks from file + memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); + memset(pReader->suppInfo.plist, 0, POINTER_BYTES); + + tsdbInitDataBlockLoadInfo(&pReader->dataBlockLoadInfo); + tsdbInitCompBlockLoadInfo(&pReader->compBlockLoadInfo); + + resetCheckInfo(pReader); } -static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { - size_t size = taosArrayGetSize(pTableCheckInfo); - for (int32_t i = 0; i < size; ++i) { - STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i); - destroyTableMemIterator(p); +int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) { + pTableBlockInfo->totalSize = 0; + pTableBlockInfo->totalRows = 0; - taosMemoryFreeClear(p->pCompInfo); - } + STsdbFS* pFileHandle = REPO_FS(pReader->pTsdb); - taosArrayDestroy(pTableCheckInfo); - return NULL; -} + // find the start data block in file + pReader->locateStart = true; + STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pReader->pTsdb); + int32_t fid = getFileIdFromKey(pReader->window.skey, pCfg->days, pCfg->precision); -void tsdbCleanupReadHandle(STsdbReader* pReader) { - if (pReader == NULL) { - return; - } + tsdbRLockFS(pFileHandle); + tsdbFSIterInit(&pReader->fileIter, pFileHandle, pReader->order); + tsdbFSIterSeek(&pReader->fileIter, fid); + tsdbUnLockFS(pFileHandle); - pReader->pColumns = doFreeColumnInfoData(pReader->pColumns); + STsdbCfg* pc = REPO_CFG(pReader->pTsdb); + pTableBlockInfo->defMinRows = pc->minRows; + pTableBlockInfo->defMaxRows = pc->maxRows; - taosArrayDestroy(pReader->suppInfo.defaultLoadColumn); - taosMemoryFreeClear(pReader->pDataBlockInfo); - taosMemoryFreeClear(pReader->suppInfo.pstatis); - taosMemoryFreeClear(pReader->suppInfo.plist); - taosMemoryFree(pReader->suppInfo.slotIds); + int32_t bucketRange = ceil((pc->maxRows - pc->minRows) / 20.0); - if (!emptyQueryTimewindow(pReader)) { - // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); - } else { - assert(pReader->pTableCheckInfo == NULL); - } + pTableBlockInfo->numOfFiles += 1; - if (pReader->pTableCheckInfo != NULL) { - pReader->pTableCheckInfo = destroyTableCheckInfo(pReader->pTableCheckInfo); - } + int32_t code = TSDB_CODE_SUCCESS; + int32_t numOfBlocks = 0; + int32_t numOfTables = (int32_t)taosArrayGetSize(pReader->pTableCheckInfo); + int defaultRows = 4096; + STimeWindow win = TSWINDOW_INITIALIZER; - tsdbDestroyReadH(&pReader->rhelper); + while (true) { + numOfBlocks = 0; + tsdbRLockFS(REPO_FS(pReader->pTsdb)); - tdFreeDataCols(pReader->pDataCols); - pReader->pDataCols = NULL; + if ((pReader->pFileGroup = tsdbFSIterNext(&pReader->fileIter)) == NULL) { + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); + break; + } - pReader->prev = doFreeColumnInfoData(pReader->prev); - pReader->next = doFreeColumnInfoData(pReader->next); + tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pReader->pFileGroup->fid, &win.skey, &win.ekey); - SIOCostSummary* pCost = &pReader->cost; + // current file are not overlapped with query time window, ignore remain files + if ((win.skey > pReader->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) { + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader, + pReader->window.skey, pReader->window.ekey, pReader->idStr); + pReader->pFileGroup = NULL; + break; + } - tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64 - " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s", - pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, - pCost->checkForNextTime, pReader->idStr); + pTableBlockInfo->numOfFiles += 1; + if (tsdbSetAndOpenReadFSet(&pReader->rhelper, pReader->pFileGroup) < 0) { + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); + code = terrno; + break; + } - taosMemoryFree(pReader->idStr); - taosMemoryFree(pReader->pSchema); - taosMemoryFreeClear(pReader); -} + tsdbUnLockFS(REPO_FS(pReader->pTsdb)); -int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - uint64_t taskId, STsdbReader** ppReader) { - int32_t code = 0; + if (tsdbLoadBlockIdx(&pReader->rhelper) < 0) { + code = terrno; + break; + } - STsdbReader* pReader = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); - if (pReader == NULL) { - return NULL; - } + if ((code = getFileCompInfo(pReader, &numOfBlocks)) != TSDB_CODE_SUCCESS) { + break; + } - if (emptyQueryTimewindow(pReader)) { - return (STsdbReader*)pReader; - } + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, + pReader->pFileGroup->fid, pReader->idStr); - // todo apply the lastkey of table check to avoid to load header file - pReader->pTableCheckInfo = createCheckInfoFromTableGroup(pReader, tableList); - if (pReader->pTableCheckInfo == NULL) { - // tsdbCleanupReadHandle(pTsdbReadHandle); - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } + if (numOfBlocks == 0) { + continue; + } - int32_t code = setCurrentSchema(pVnode, pReader); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } + pTableBlockInfo->numOfBlocks += numOfBlocks; - int32_t numOfCols = taosArrayGetSize(pReader->suppInfo.defaultLoadColumn); - int16_t* ids = pReader->suppInfo.defaultLoadColumn->pData; + for (int32_t i = 0; i < numOfTables; ++i) { + STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i); - STSchema* pSchema = pReader->pSchema; + SBlock* pBlock = pCheckInfo->pCompInfo->blocks; - int32_t i = 0, j = 0; - while (i < numOfCols && j < pSchema->numOfCols) { - if (ids[i] == pSchema->columns[j].colId) { - pReader->suppInfo.slotIds[i] = j; - i++; - j++; - } else if (ids[i] > pSchema->columns[j].colId) { - j++; - } else { - // tsdbCleanupReadHandle(pTsdbReadHandle); - terrno = TSDB_CODE_INVALID_PARA; - return NULL; + for (int32_t j = 0; j < pCheckInfo->numOfBlocks; ++j) { + pTableBlockInfo->totalSize += pBlock[j].len; + + int32_t numOfRows = pBlock[j].numOfRows; + pTableBlockInfo->totalRows += numOfRows; + + if (numOfRows > pTableBlockInfo->maxRows) { + pTableBlockInfo->maxRows = numOfRows; + } + + if (numOfRows < pTableBlockInfo->minRows) { + pTableBlockInfo->minRows = numOfRows; + } + + if (numOfRows < defaultRows) { + pTableBlockInfo->numOfSmallBlocks += 1; + } + + int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows); + pTableBlockInfo->blockRowsHisto[bucketIndex]++; + } } } - tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pReader, - taosArrayGetSize(pReader->pTableCheckInfo), taosArrayGetSize(tableList->pTableList), pReader->idStr); + pTableBlockInfo->numOfTables = numOfTables; + return code; +} - return (STsdbReader*)pReader; +int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { + int64_t rows = 0; + SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; + if (pMemTable == NULL) { + return rows; + } + + size_t size = taosArrayGetSize(pReader->pTableCheckInfo); + for (int32_t i = 0; i < size; ++i) { + STableCheckInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i); + + // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { + // pMem = pMemT->tData[pCheckInfo->tableId]; + // rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; + // } + // if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { + // pIMem = pIMemT->tData[pCheckInfo->tableId]; + // rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; + // } + } + return rows; } \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 06d4f8168e0454efcfcde39d6b6400c3760f77f7..73291a11232ee062060e8c67b0a22772bd293823 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4519,7 +4519,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); if (code) { - tsdbCleanupReadHandle(pDataReader); + tsdbReaderClose(pDataReader); pTaskInfo->code = terrno; return NULL; } @@ -4528,7 +4528,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json taosArrayDestroy(groupKeys); if (code) { - tsdbCleanupReadHandle(pDataReader); + tsdbReaderClose(pDataReader); pTaskInfo->code = terrno; return NULL; } @@ -4575,7 +4575,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json taosArrayDestroy(groupKeys); if (code) { - tsdbCleanupReadHandle(pDataReader); + tsdbReaderClose(pDataReader); return NULL; } @@ -4900,8 +4900,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { } SArray* extractPartitionColInfo(SNodeList* pNodeList) { - if(!pNodeList) { - return NULL; + if (!pNodeList) { + return NULL; } size_t numOfCols = LIST_LENGTH(pNodeList); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f5779f5e57af762a1dd05320ba352a6eb083ec7f..9c69b739b7608d58e07acd4a5ede19e1477c9641 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -522,7 +522,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pTableScanInfo->pResBlock); clearupQueryTableDataCond(&pTableScanInfo->cond); - tsdbCleanupReadHandle(pTableScanInfo->dataReader); + tsdbReaderClose(pTableScanInfo->dataReader); if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo); @@ -2222,7 +2222,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); - tsdbCleanupReadHandle(reader); + tsdbReaderClose(reader); } taosArrayDestroy(pTableScanInfo->dataReaders);