提交 c71c88a7 编写于 作者: H Haojun Liao

enh(query): enable check sma status for each column in query.

上级 dda7a598
......@@ -74,9 +74,9 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo;
typedef struct SSDataBlock {
SColumnDataAgg* pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData>
SDataBlockInfo info;
SColumnDataAgg** pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData>
SDataBlockInfo info;
} SSDataBlock;
typedef struct SVarColAttr {
......
......@@ -490,7 +490,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg);
bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
char* p = colDataGetData(pColData, j);
colDataAppend(pDstCol, j - startIndex, p, isNull);
......@@ -702,8 +702,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
SColumnInfoData* pColInfoData = pOrder->pColData; // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
if (pColInfoData->hasNull) {
bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg);
bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg);
bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
if (leftNull && rightNull) {
continue; // continue to next slot
}
......@@ -742,7 +742,7 @@ static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, co
SColumnInfoData* pDst = &pDstCols[i];
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i);
if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg)) {
if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg[i])) {
code = colDataAppend(pDst, numOfRows, NULL, true);
if (code != TSDB_CODE_SUCCESS) {
return code;
......
......@@ -111,7 +111,7 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool* allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
......
......@@ -97,12 +97,20 @@ typedef struct SIOCostSummary {
int64_t headFileLoadTime;
} SIOCostSummary;
typedef struct SBlockLoadSuppInfo {
SColumnDataAgg *pstatis;
SColumnDataAgg **plist;
SArray *defaultLoadColumn; // default load column
int32_t *slotIds; // colId to slotId
} SBlockLoadSuppInfo;
typedef struct STsdbReadHandle {
STsdb* pTsdb;
SQueryFilePos cur; // current position
int16_t order;
STimeWindow window; // the primary query time window that applies to all queries
SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t numOfBlocks;
SArray* pColumns; // column list, SColumnInfoData array list
bool locateStart;
......@@ -123,10 +131,9 @@ typedef struct STsdbReadHandle {
STableBlockInfo* pDataBlockInfo;
SDataCols* pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
SArray* defaultLoadColumn; // default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
SBlockLoadSuppInfo suppInfo;
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SIOCostSummary cost;
......@@ -378,8 +385,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
if (pCond->numOfCols > 0) {
// allocate buffer in order to load data blocks from file
pReadHandle->statis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pReadHandle->statis == NULL) {
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pReadHandle->suppInfo.pstatis == NULL) {
goto _end;
}
......@@ -399,10 +406,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
}
taosArrayPush(pReadHandle->pColumns, &colInfo);
pReadHandle->statis[i].colId = colInfo.info.colId;
}
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
pReadHandle->suppInfo.defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
pReadHandle->suppInfo.slotIds = taosMemoryMalloc(sizeof(int32_t) * taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn));
pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
}
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRows);
......@@ -444,7 +452,27 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
}
STableCheckInfo *pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
STSchema* pSchema = pTsdbReadHandle->pSchema;
int32_t i = 0, j = 0;
while(i < numOfCols && j < pSchema->numOfCols) {
if (ids[i] == pSchema->columns[j].colId) {
pTsdbReadHandle->suppInfo.slotIds[i] = j;
i++;
j++;
} else if (ids[i] > pSchema->columns[j].colId) {
j++;
} else {
// tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
}
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle,
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList),
......@@ -482,7 +510,8 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
}
// allocate buffer in order to load data blocks from file
memset(pTsdbReadHandle->statis, 0, sizeof(SColumnDataAgg));
memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
......@@ -510,7 +539,8 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon
}
// allocate buffer in order to load data blocks from file
memset(pTsdbReadHandle->statis, 0, sizeof(SColumnDataAgg));
memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
......@@ -1117,7 +1147,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
goto _error;
}
int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData;
int16_t* colIds = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
(int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
......@@ -3243,8 +3273,9 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
/*
* return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
*/
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg** pBlockStatis) {
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg*** pBlockStatis, bool* allHave) {
STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
*allHave = false;
SQueryFilePos* c = &pHandle->cur;
if (c->mixBlock) {
......@@ -3273,36 +3304,47 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat
tsdbDebug("vgId:%d succeed to load block statis part for uid %" PRIu64, REPO_ID(pHandle->pTsdb),
TSDB_READ_TABLE_UID(&pHandle->rhelper));
int16_t* colIds = pHandle->defaultLoadColumn->pData;
int16_t* colIds = pHandle->suppInfo.defaultLoadColumn->pData;
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
memset(pHandle->statis, 0, numOfCols * sizeof(SColumnDataAgg));
memset(pHandle->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
memset(pHandle->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));
for (int32_t i = 0; i < numOfCols; ++i) {
pHandle->statis[i].colId = colIds[i];
pHandle->suppInfo.pstatis[i].colId = colIds[i];
}
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
*allHave = true;
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
// always load the first primary timestamp column data
SColumnDataAgg* pPrimaryColStatis = &pHandle->statis[0];
SColumnDataAgg* pPrimaryColStatis = &pHandle->suppInfo.pstatis[0];
assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
pPrimaryColStatis->numOfNull = 0;
pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
// update the number of NULL data rows
int32_t* slotIds = pHandle->suppInfo.slotIds;
for (int32_t i = 1; i < numOfCols; ++i) {
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
ASSERT(colIds[i] == pHandle->pSchema->columns[slotIds[i]].colId);
if (pHandle->pSchema->columns[slotIds[i]].sma) {
if (pHandle->suppInfo.pstatis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
} else {
pHandle->suppInfo.plist[i] = &pHandle->suppInfo.pstatis[i];
}
} else {
*allHave = false;
}
}
int64_t elapsed = taosGetTimestampUs() - stime;
pHandle->cost.statisInfoLoadTime += elapsed;
*pBlockStatis = pHandle->statis;
*pBlockStatis = pHandle->suppInfo.plist;
return TSDB_CODE_SUCCESS;
}
......@@ -3804,9 +3846,10 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
pTsdbReadHandle->pColumns = doFreeColumnInfoData(pTsdbReadHandle->pColumns);
taosArrayDestroy(pTsdbReadHandle->defaultLoadColumn);
taosArrayDestroy(pTsdbReadHandle->suppInfo.defaultLoadColumn);
taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo);
taosMemoryFreeClear(pTsdbReadHandle->statis);
taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis);
taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist);
if (!emptyQueryTimewindow(pTsdbReadHandle)) {
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
......
......@@ -1819,7 +1819,10 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId;
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
if (pInput->pColumnDataAgg[j] == NULL) {
pInput->colDataAggIsSet = false;
}
// Here we set the column info data since the data type for each column data is required, but
// the data in the corresponding SColumnInfoData will not be used.
......@@ -5388,7 +5391,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
pRowSup->numOfRows = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg)) {
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg[pInfo->colIndex])) {
continue;
}
......
......@@ -77,7 +77,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
SColumn* pCol = taosArrayGet(pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
......@@ -118,7 +118,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
}
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
......
......@@ -188,14 +188,18 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
SColumnDataAgg* pColAgg = NULL;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg);
bool allHave = true;
SColumnDataAgg** pColAgg = NULL;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg, &allHave);
if (pColAgg != NULL) {
if (allHave == true) {
int32_t numOfCols = pBlock->info.numOfCols;
// todo create this buffer during creating operator
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
......
......@@ -374,18 +374,17 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]);
}
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]);
}
if (leftNull && rightNull) {
......
......@@ -164,13 +164,13 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]);
}
SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]);
}
if (leftNull && rightNull) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册