diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index b6a37d85a2c6a5df1ebad940e3a8c83cca782f13..74de811c59301e1edf39639dc453789c06f85223 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -326,8 +326,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u #define TSDB_QUERY_TYPE_INSERT 0x100u // insert type -#define TSDB_QUERY_TYPE_IMPORT 0x200u // import data -#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x800u +#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 052273a90cfd539eb36b9bea55dd7943d1a6a495..a8158a386da494f084f77dd78b6af78736eca84f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -403,23 +403,24 @@ static bool isTopBottomQuery(SQuery *pQuery) { return false; } -static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, int32_t index) { +static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) { // for a tag column, no corresponding field info - SColIndex *pColIndexEx = &pQuery->pSelectExpr[index].base.colInfo; - if (TSDB_COL_IS_TAG(pColIndexEx->flag)) { + SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo; + if (TSDB_COL_IS_TAG(pColIndex->flag)) { return NULL; } - + /* * Choose the right column field info by field id, since the file block may be out of date, * which means the newest table schema is not equalled to the schema of this block. + * TODO: speedup by using bsearch */ - for (int32_t i = 0; i < pDataBlockInfo->numOfCols; ++i) { - if (pColIndexEx->colId == pStatis[i].colId) { + for (int32_t i = 0; i < numOfCols; ++i) { + if (pColIndex->colId == pStatis[i].colId) { return &pStatis[i]; } } - + return NULL; } @@ -431,8 +432,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo * @param pColStatis * @return */ -static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, - SDataStatis **pColStatis) { +static bool hasNullValue(SQuery *pQuery, int32_t col, int32_t numOfCols, SDataStatis *pStatis, SDataStatis **pColStatis) { SColIndex *pColIndex = &pQuery->pSelectExpr[col].base.colInfo; if (TSDB_COL_IS_TAG(pColIndex->flag)) { return false; @@ -444,7 +444,7 @@ static bool hasNullValue(SQuery *pQuery, int32_t col, SDataBlockInfo *pDataBlock } if (pStatis != NULL) { - *pColStatis = getStatisInfo(pQuery, pStatis, pDataBlockInfo, col); + *pColStatis = getStatisInfo(pQuery, pStatis, numOfCols, col); } else { *pColStatis = NULL; } @@ -936,7 +936,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SDataStatis *tpField = NULL; - bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &tpField); + bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &tpField); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, tpField, hasNull, @@ -1157,7 +1157,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SDataStatis *pColStatis = NULL; - bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo, pStatis, &pColStatis); + bool hasNull = hasNullValue(pQuery, k, pDataBlockInfo->numOfCols, pStatis, &pColStatis); char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo->rows, functionId, pColStatis, hasNull, @@ -2455,9 +2455,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } SDataStatis *pStatis = NULL; - SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; + + SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv), @@ -5610,18 +5610,18 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { continue; } - SColIndex *pColIndexEx = &pSqlExprMsg->colInfo; - if (!TSDB_COL_IS_TAG(pColIndexEx->flag)) { + SColIndex *pColIndex = &pSqlExprMsg->colInfo; + if (!TSDB_COL_IS_TAG(pColIndex->flag)) { for (int32_t f = 0; f < pQuery->numOfCols; ++f) { - if (pColIndexEx->colId == pQuery->colList[f].colId) { - pColIndexEx->colIndex = f; + if (pColIndex->colId == pQuery->colList[f].colId) { + pColIndex->colIndex = f; break; } } } else { for (int32_t f = 0; f < pQuery->numOfTags; ++f) { - if (pColIndexEx->colId == pQuery->tagColList[f].colId) { - pColIndexEx->colIndex = f; + if (pColIndex->colId == pQuery->tagColList[f].colId) { + pColIndex->colIndex = f; break; } } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b5340d4ba9455c128de6979ab21a0ff0952886d8..cc191f39001aac11d0c16a079485d5c2d7ff3683 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -40,10 +40,6 @@ enum { TSDB_QUERY_TYPE_EXTERNAL = 3, }; -typedef struct SField { - // todo need the definition -} SField; - typedef struct SQueryFilePos { int32_t fid; int32_t slot; @@ -68,66 +64,56 @@ typedef struct SLoadCompBlockInfo { } SLoadCompBlockInfo; typedef struct STableCheckInfo { - STableId tableId; - TSKEY lastKey; - STable* pTableObj; - int32_t start; - SCompInfo* pCompInfo; - int32_t compSize; - int32_t numOfBlocks; // number of qualified data blocks not the original blocks - SDataCols* pDataCols; - - SSkipListIterator* iter; // skip list iterator - SSkipListIterator* iiter; // imem iterator - - bool initBuf; // if we should initialize the in-memory skip list iterator + STableId tableId; + TSKEY lastKey; + STable* pTableObj; + SCompInfo* pCompInfo; + int32_t compSize; + int32_t numOfBlocks; // number of qualified data blocks not the original blocks + SDataCols* pDataCols; + bool initBuf; // whether to initialize the in-memory skip list iterator or not + SSkipListIterator* iter; // mem buffer skip list iterator + SSkipListIterator* iiter; // imem buffer skip list iterator } STableCheckInfo; -typedef struct { - SCompBlock* compBlock; - SField* fields; -} SCompBlockFields; - typedef struct STableBlockInfo { - SCompBlockFields pBlock; - STableCheckInfo* pTableCheckInfo; - int32_t blockIndex; - int32_t groupIdx; /* number of group is less than the total number of tables */ + SCompBlock* compBlock; + STableCheckInfo* pTableCheckInfo; +// int32_t blockIndex; +// int32_t groupIdx; /* number of group is less than the total number of tables */ } STableBlockInfo; typedef struct SBlockOrderSupporter { int32_t numOfTables; - STableBlockInfo** pDataBlockInfo; + STableBlockInfo** pDataBlockInfo; int32_t* blockIndexArray; int32_t* numOfBlocksPerTable; } SBlockOrderSupporter; typedef struct STsdbQueryHandle { - STsdbRepo* pTsdb; - SQueryFilePos cur; // current position - - SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ - SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ - - int16_t order; - STimeWindow window; // the primary query time window that applies to all queries - SCompBlock* pBlock; - int32_t numOfBlocks; - SField** pFields; - SArray* pColumns; // column list, SColumnInfoData array list - bool locateStart; - int32_t outputCapacity; - int32_t realNumOfRows; - SArray* pTableCheckInfo; //SArray - int32_t activeIndex; - bool checkFiles; // check file stage - void* qinfo; // query info handle, for debug purpose - int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows - STableBlockInfo* pDataBlockInfo; - + STsdbRepo* pTsdb; + SQueryFilePos cur; // current position + int16_t order; + STimeWindow window; // the primary query time window that applies to all queries + SCompBlock* pBlock; + SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time + int32_t numOfBlocks; + SArray* pColumns; // column list, SColumnInfoData array list + bool locateStart; + int32_t outputCapacity; + int32_t realNumOfRows; + SArray* pTableCheckInfo; //SArray + int32_t activeIndex; + bool checkFiles; // check file stage + void* qinfo; // query info handle, for debug purpose + int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows SFileGroup* pFileGroup; SFileGroupIter fileIter; SRWHelper rhelper; + STableBlockInfo* pDataBlockInfo; + + SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ + SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ } STsdbQueryHandle; static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); @@ -148,23 +134,43 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable // todo 2. add the reference count for each table that is involved in query STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); - pQueryHandle->order = pCond->order; - pQueryHandle->window = pCond->twindow; - pQueryHandle->pTsdb = tsdb; - pQueryHandle->type = TSDB_QUERY_TYPE_ALL; + pQueryHandle->order = pCond->order; + pQueryHandle->window = pCond->twindow; + pQueryHandle->pTsdb = tsdb; + pQueryHandle->type = TSDB_QUERY_TYPE_ALL; + pQueryHandle->cur.fid = -1; + pQueryHandle->cur.win = TSWINDOW_INITIALIZER; + pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order); + pQueryHandle->activeIndex = 0; // current active table index + pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; + tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); - pQueryHandle->cur.fid = -1; - pQueryHandle->cur.win = TSWINDOW_INITIALIZER; - size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); + // allocate buffer in order to load data blocks from file + int32_t numOfCols = pCond->numOfCols; + + pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); + pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfo = {{0}, 0}; + + colInfo.info = pCond->colList[i]; + colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); + taosArrayPush(pQueryHandle->pColumns, &colInfo); + pQueryHandle->statis[i].colId = colInfo.info.colId; + } + pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); + STsdbMeta* pMeta = tsdbGetMeta(tsdb); + assert(pMeta != NULL); for (int32_t i = 0; i < sizeOfGroup; ++i) { SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); - + size_t gsize = taosArrayGetSize(group); assert(gsize > 0); @@ -174,35 +180,18 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable STableCheckInfo info = { .lastKey = pQueryHandle->window.skey, .tableId = *id, - .pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid), + .pTableObj = tsdbGetTableByUid(pMeta, id->uid), }; - + assert(info.pTableObj != NULL && info.pTableObj->tableId.tid == id->tid); taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } } - - uTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); - - /* - * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place - * in case of descending timestamp order query. - */ - pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order); - pQueryHandle->activeIndex = 0; - - // allocate buffer in order to load data blocks from file - int32_t numOfCols = pCond->numOfCols; - pQueryHandle->outputCapacity = 4096; - - pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < pCond->numOfCols; ++i) { - SColumnInfoData colInfo = {{0}, 0}; - colInfo.info = pCond->colList[i]; - colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); - taosArrayPush(pQueryHandle->pColumns, &colInfo); + for(int32_t i = 0; i < numOfCols; ++i) { } + + uTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo)); tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); @@ -499,8 +488,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo bool blockLoaded = false; SArray* sa = getDefaultLoadColumns(pQueryHandle, true); - if (pCheckInfo->pDataCols == NULL) { // todo: why not the real data? - pCheckInfo->pDataCols = tdNewDataCols(pRepo->tsdbMeta->maxRowBytes, pRepo->tsdbMeta->maxCols, pRepo->config.maxRowsPerFileBlock); + if (pCheckInfo->pDataCols == NULL) { + STsdbMeta* pMeta = tsdbGetMeta(pRepo); + pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); } tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj)); @@ -522,8 +512,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo } static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ - SArray* sa = getDefaultLoadColumns(pQueryHandle, true); - SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); @@ -591,9 +579,12 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* cur->blockCompleted = false; return; } - + + SArray* sa = getDefaultLoadColumns(pQueryHandle, true); doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); + taosArrayDestroy(sa); + } else { pQueryHandle->realNumOfRows = binfo.rows; @@ -1074,7 +1065,9 @@ static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t n tfree(pSupporter->blockIndexArray); for (int32_t i = 0; i < numOfTables; ++i) { - tfree(pSupporter->pDataBlockInfo[i]); + STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i]; +// tfree(pBlockInfo->statis); + tfree(pBlockInfo); } tfree(pSupporter->pDataBlockInfo); @@ -1100,14 +1093,14 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void* STableBlockInfo* pLeftBlockInfoEx = &pSupporter->pDataBlockInfo[leftTableIndex][leftTableBlockIndex]; STableBlockInfo* pRightBlockInfoEx = &pSupporter->pDataBlockInfo[rightTableIndex][rightTableBlockIndex]; - // assert(pLeftBlockInfoEx->pBlock.compBlock->offset != pRightBlockInfoEx->pBlock.compBlock->offset); - if (pLeftBlockInfoEx->pBlock.compBlock->offset == pRightBlockInfoEx->pBlock.compBlock->offset && - pLeftBlockInfoEx->pBlock.compBlock->last == pRightBlockInfoEx->pBlock.compBlock->last) { + // assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset); + if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset && + pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) { // todo add more information - uError("error in header file, two block with same offset:%p", pLeftBlockInfoEx->pBlock.compBlock->offset); + uError("error in header file, two block with same offset:%p", pLeftBlockInfoEx->compBlock->offset); } - return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1; + return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1; } static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) { @@ -1116,7 +1109,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO return TSDB_CODE_SERV_OUT_OF_MEMORY; } - pQueryHandle->pDataBlockInfo = (STableBlockInfo*)tmp; + pQueryHandle->pDataBlockInfo = (STableBlockInfo*) tmp; memset(pQueryHandle->pDataBlockInfo, 0, sizeof(STableBlockInfo) * numOfBlocks); *numOfAllocBlocks = numOfBlocks; @@ -1132,9 +1125,10 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO cleanBlockOrderSupporter(&sup, 0); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - + int32_t cnt = 0; int32_t numOfQualTables = 0; + for (int32_t j = 0; j < numOfTables; ++j) { STableCheckInfo* pTableCheck = (STableCheckInfo*)taosArrayGet(pQueryHandle->pTableCheckInfo, j); if (pTableCheck->numOfBlocks <= 0) { @@ -1153,14 +1147,12 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO sup.pDataBlockInfo[numOfQualTables] = (STableBlockInfo*)buf; for (int32_t k = 0; k < pTableCheck->numOfBlocks; ++k) { - STableBlockInfo* pBlockInfoEx = &sup.pDataBlockInfo[numOfQualTables][k]; - - pBlockInfoEx->pBlock.compBlock = &pBlock[k]; - pBlockInfoEx->pBlock.fields = NULL; + STableBlockInfo* pBlockInfo = &sup.pDataBlockInfo[numOfQualTables][k]; - pBlockInfoEx->pTableCheckInfo = pTableCheck; - // pBlockInfoEx->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index - // pBlockInfoEx->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table + pBlockInfo->compBlock = &pBlock[k]; + pBlockInfo->pTableCheckInfo = pTableCheck; + // pBlockInfo->groupIdx = pTableCheckInfo[j]->groupIdx; // set the group index + // pBlockInfo->blockIndex = pTableCheckInfo[j]->start + k; // set the block index in original table cnt++; } @@ -1185,8 +1177,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO int32_t pos = pTree->pNode[0].index; int32_t index = sup.blockIndexArray[pos]++; - STableBlockInfo* pBlocksInfoEx = sup.pDataBlockInfo[pos]; - pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfoEx[index]; + STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos]; + pQueryHandle->pDataBlockInfo[numOfTotal++] = pBlocksInfo[index]; // set data block index overflow, in order to disable the offset comparator if (sup.blockIndexArray[pos] >= sup.numOfBlocksPerTable[pos]) { @@ -1199,7 +1191,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO /* * available when no import exists * for(int32_t i = 0; i < cnt - 1; ++i) { - * assert((*pDataBlockInfo)[i].pBlock.compBlock->offset < (*pDataBlockInfo)[i+1].pBlock.compBlock->offset); + * assert((*pDataBlockInfo)[i].compBlock->offset < (*pDataBlockInfo)[i+1].compBlock->offset); * } */ @@ -1255,7 +1247,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { cur->fid = pQueryHandle->pFileGroup->fileId; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; - return loadFileDataBlock(pQueryHandle, pBlockInfo->pBlock.compBlock, pBlockInfo->pTableCheckInfo); + return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); } static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { @@ -1291,10 +1283,10 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { cur->blockCompleted = false; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; - return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo); + return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); } } else { - handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->pBlock.compBlock, pCheckInfo); + handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); return pQueryHandle->realNumOfRows > 0; } } @@ -1484,35 +1476,33 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { // there are data in file if (pHandle->cur.fid >= 0) { STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; - STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; - - STable* pTable = pCheckInfo->pTableObj; + STable* pTable = pBlockInfo->pTableCheckInfo->pTableObj; - if (pHandle->cur.mixBlock) { - SDataBlockInfo blockInfo = { - .uid = pTable->tableId.uid, - .tid = pTable->tableId.tid, - .rows = pHandle->cur.rows, - .window = pHandle->cur.win, - }; - - return blockInfo; - } else { - return getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock); - } + SDataBlockInfo blockInfo = { + .uid = pTable->tableId.uid, + .tid = pTable->tableId.tid, + .rows = pHandle->cur.rows, + .window = pHandle->cur.win, + .numOfCols = QH_GET_NUM_OF_COLS(pHandle), + }; + + return blockInfo; } else { STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); + SQueryFilePos* cur = &pHandle->cur; STable* pTable = pCheckInfo->pTableObj; if (pTable->mem != NULL) { // create mem table iterator if it is not created yet assert(pCheckInfo->iter != NULL); - STimeWindow* win = &pHandle->cur.win; + STimeWindow* win = &cur->win; pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API // update the last key value pCheckInfo->lastKey = win->ekey + step; + cur->lastKey = win->ekey + step; + cur->mixBlock = true; } if (!ASCENDING_TRAVERSE(pHandle->order)) { @@ -1524,15 +1514,34 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { .tid = pTable->tableId.tid, .rows = pHandle->cur.rows, .window = pHandle->cur.win, + .numOfCols = QH_GET_NUM_OF_COLS(pHandle), }; return blockInfo; } } -// return null for data block in cache +/* + * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL + */ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) { - *pBlockStatis = NULL; + STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; + + SQueryFilePos* cur = &pHandle->cur; + if (cur->mixBlock) { + *pBlockStatis = NULL; + return TSDB_CODE_SUCCESS; + } + + assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) || + ((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0))); + + STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; + tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); + + tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, QH_GET_NUM_OF_COLS(pHandle)); + *pBlockStatis = pHandle->statis; + return TSDB_CODE_SUCCESS; } @@ -1546,13 +1555,13 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { if (pHandle->cur.fid < 0) { return pHandle->pColumns; } else { - STableBlockInfo* pBlockInfoEx = &pHandle->pDataBlockInfo[pHandle->cur.slot]; - STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo; + STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; + STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; if (pHandle->cur.mixBlock) { return pHandle->pColumns; } else { - SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock); + SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->compBlock); assert(pHandle->realNumOfRows <= binfo.rows); // data block has been loaded, todo extract method @@ -1562,7 +1571,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { return pHandle->pColumns; } else { // only load the file block - SCompBlock* pBlock = pBlockInfoEx->pBlock.compBlock; + SCompBlock* pBlock = pBlockInfo->compBlock; doLoadFileDataBlock(pHandle, pBlock, pCheckInfo); // todo refactor @@ -2006,8 +2015,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { } taosArrayDestroy(pQueryHandle->pColumns); - tfree(pQueryHandle->pDataBlockInfo); + tfree(pQueryHandle->statis); + tsdbDestroyHelper(&pQueryHandle->rhelper); tfree(pQueryHandle);