diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 09a821ef060781fd262251323b5deb5dd44df8bd..c0eb53d8cab2def78719c938d99c4c293fd4f178 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -74,9 +74,9 @@ typedef struct SDataBlockInfo { } SDataBlockInfo; typedef struct SSDataBlock { - SColumnDataAgg* pBlockAgg; - SArray* pDataBlock; // SArray - SDataBlockInfo info; + SColumnDataAgg** pBlockAgg; + SArray* pDataBlock; // SArray + SDataBlockInfo info; } SSDataBlock; typedef struct SVarColAttr { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f30a74bf11a56048294366a3b6ee1e96d11c0666..4a6fef4626b3be0751c87b7a68a48a9aa2eaf063 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -490,7 +490,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { - bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg); + bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); char* p = colDataGetData(pColData, j); colDataAppend(pDstCol, j - startIndex, p, isNull); @@ -702,8 +702,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { SColumnInfoData* pColInfoData = pOrder->pColData; // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex); if (pColInfoData->hasNull) { - bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg); - bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg); + bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL); + bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL); if (leftNull && rightNull) { continue; // continue to next slot } @@ -742,7 +742,7 @@ static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, co SColumnInfoData* pDst = &pDstCols[i]; SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i); - if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg)) { + if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg[i])) { code = colDataAppend(pDst, numOfRows, NULL, true); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 31b1c17482c25a51fd7fe0107f15c760a39bd121..10dc1d97d24be4625c44f1a372c0986ae4965f9c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -111,7 +111,7 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); -int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis); +int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool* allHave); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index cef408f6d79f05fa5ae55fef4352892c974aa97c..3f7e79b148b2c754ed78eb93822b38a5195df35d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -97,12 +97,20 @@ typedef struct SIOCostSummary { int64_t headFileLoadTime; } SIOCostSummary; +typedef struct SBlockLoadSuppInfo { + SColumnDataAgg *pstatis; + SColumnDataAgg **plist; + SArray *defaultLoadColumn; // default load column + int32_t *slotIds; // colId to slotId +} SBlockLoadSuppInfo; + typedef struct STsdbReadHandle { STsdb* pTsdb; SQueryFilePos cur; // current position int16_t order; STimeWindow window; // the primary query time window that applies to all queries - SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time +// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time +// SColumnDataAgg** pstatis;// the ptr array list to return to caller int32_t numOfBlocks; SArray* pColumns; // column list, SColumnInfoData array list bool locateStart; @@ -123,10 +131,9 @@ typedef struct STsdbReadHandle { STableBlockInfo* pDataBlockInfo; SDataCols* pDataCols; // in order to hold current file data block int32_t allocSize; // allocated data block size - SArray* defaultLoadColumn; // default load column SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */ - + SBlockLoadSuppInfo suppInfo; SArray* prev; // previous row which is before than time window SArray* next; // next row which is after the query time window SIOCostSummary cost; @@ -378,8 +385,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* if (pCond->numOfCols > 0) { // allocate buffer in order to load data blocks from file - pReadHandle->statis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); - if (pReadHandle->statis == NULL) { + pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); + if (pReadHandle->suppInfo.pstatis == NULL) { goto _end; } @@ -399,10 +406,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* } taosArrayPush(pReadHandle->pColumns, &colInfo); - pReadHandle->statis[i].colId = colInfo.info.colId; } - pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); + pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); + pReadHandle->suppInfo.slotIds = taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn)); + pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES); } pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRows); @@ -444,7 +452,27 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG } STableCheckInfo *pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0); + pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0); + int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn); + int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData; + + STSchema* pSchema = pTsdbReadHandle->pSchema; + + int32_t i = 0, j = 0; + while(i < numOfCols && j < pSchema->numOfCols) { + if (ids[i] == pSchema->columns[j].colId) { + pTsdbReadHandle->suppInfo.slotIds[i] = j; + i++; + j++; + } else if (ids[i] > pSchema->columns[j].colId) { + j++; + } else { + // tsdbCleanupReadHandle(pTsdbReadHandle); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + } tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList), @@ -482,7 +510,8 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { } // allocate buffer in order to load data blocks from file - memset(pTsdbReadHandle->statis, 0, sizeof(SColumnDataAgg)); + memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); + memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES); tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo); @@ -510,7 +539,8 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon } // allocate buffer in order to load data blocks from file - memset(pTsdbReadHandle->statis, 0, sizeof(SColumnDataAgg)); + memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); + memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES); tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo); @@ -1117,7 +1147,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl goto _error; } - int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData; + int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData; int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true); @@ -3243,8 +3273,9 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa /* * return null for mixed data block, if not a complete file data block, the statistics value will always return NULL */ -int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg** pBlockStatis) { +int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) { STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle; + *allHave = false; SQueryFilePos* c = &pHandle->cur; if (c->mixBlock) { @@ -3273,36 +3304,47 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat tsdbDebug("vgId:%d succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb), TSDB_READ_TABLE_UID(&pHandle->rhelper)); - int16_t* colIds = pHandle->defaultLoadColumn->pData; + int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData; size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle); - memset(pHandle->statis, 0, numOfCols * sizeof(SColumnDataAgg)); + memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES); + memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg)); + for (int32_t i = 0; i < numOfCols; ++i) { - pHandle->statis[i].colId = colIds[i]; + pHandle->suppInfo.pstatis[i].colId = colIds[i]; } - tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock); + *allHave = true; + tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock); // always load the first primary timestamp column data - SColumnDataAgg* pPrimaryColStatis = &pHandle->statis[0]; + SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0]; assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID); pPrimaryColStatis->numOfNull = 0; pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst; pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast; + pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0]; // update the number of NULL data rows + int32_t* slotIds = pHandle->suppInfo.slotIds; for (int32_t i = 1; i < numOfCols; ++i) { - if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL - pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; + ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId); + if (pHandle->pSchema->columns[slotIds[i]].sma) { + if (pHandle->suppInfo.pstatis[i].numOfNull == -1) { // set the column data are all NULL + pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows; + } else { + pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i]; + } + } else { + *allHave = false; } - } int64_t elapsed = taosGetTimestampUs() - stime; pHandle->cost.statisInfoLoadTime += elapsed; - *pBlockStatis = pHandle->statis; + *pBlockStatis = pHandle->suppInfo.plist; return TSDB_CODE_SUCCESS; } @@ -3804,9 +3846,10 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns); - taosArrayDestroy(pTsdbReadHandle->defaultLoadColumn); + taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn); taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo); - taosMemoryFreeClear(pTsdbReadHandle->statis); + taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis); + taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist); if (!emptyQueryTimewindow(pTsdbReadHandle)) { // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 05358f43003bbebe2af583b987844fdfa0d9d469..60649ce6513e4835a93a1a2b826f5da8065a2cbe 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1819,7 +1819,10 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; - pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; + pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId]; + if (pInput->pColumnDataAgg[j] == NULL) { + pInput->colDataAggIsSet = false; + } // Here we set the column info data since the data type for each column data is required, but // the data in the corresponding SColumnInfoData will not be used. @@ -5388,7 +5391,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pRowSup->numOfRows = 0; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg)) { + if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg[pInfo->colIndex])) { continue; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index beee11ec189680adaf5388d44e56bc523e5ca91a..dd9cd52870c1cc4ad7beb322e8998c6047367ce1 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -77,7 +77,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { - pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); @@ -118,7 +118,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { - pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dc87b864f17e0102ac4f13d736b7915fbc488445..4948f64d84ed991473929f24dcdaf625acfe07b0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -188,14 +188,18 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - SColumnDataAgg* pColAgg = NULL; - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg); + bool allHave = true; + SColumnDataAgg** pColAgg = NULL; + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allHave); - if (pColAgg != NULL) { + if (allHave == true) { int32_t numOfCols = pBlock->info.numOfCols; // todo create this buffer during creating operator - pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + } + for (int32_t i = 0; i < numOfCols; ++i) { SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); if (!pColMatchInfo->output) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 2ac28200e79d3b365fc75d055f785e0a99963355..46fc8da00caf938ed6ad287b74b1120e87ce3b11 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -374,18 +374,17 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); - SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); bool leftNull = false; if (pLeftColInfoData->hasNull) { - leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); } SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]); } if (leftNull && rightNull) { diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index efa0581fd0b09893054c09a9429cb589411d3e91..e646387856e66b147b5fb369b8d522b155f5d04c 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -164,13 +164,13 @@ int32_t docomp(const void* p1, const void* p2, void* param) { bool leftNull = false; if (pLeftColInfoData->hasNull) { - leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); } SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]); } if (leftNull && rightNull) {