diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 797843002751928990f1eb7046ee99a4a3baa5b2..98edd9f0a5c73ec64bba789e1bb6e633f42e39af 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -182,6 +182,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t pageSize); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); +int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity); SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fb6640e59eaf129b701249cb87220695c60f6fd9..f8ae82ed67c7168935fc351491f51edaeef76fd9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -415,10 +415,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - bool isNull = colDataIsNull(pColInfoData, numOfRows, j, NULL); - if (isNull) { - // do nothing - } else { + if (pColInfoData->varmeta.offset[j] != -1) { char* p = colDataGetData(pColInfoData, j); size += varDataTLen(p); } @@ -547,8 +544,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); - size_t metaSize = pBlock->info.rows * sizeof(int32_t); if (IS_VAR_DATA_TYPE(pCol->info.type)) { + size_t metaSize = pBlock->info.rows * sizeof(int32_t); memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; } else { @@ -581,6 +578,49 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { return TSDB_CODE_SUCCESS; } +int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { + pBlock->info.rows = *(int32_t*)buf; + + int32_t numOfCols = pBlock->info.numOfCols; + const char* pStart = buf + sizeof(uint32_t); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + size_t metaSize = capacity * sizeof(int32_t); + memcpy(pCol->varmeta.offset, pStart, metaSize); + pStart += metaSize; + } else { + memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity)); + pStart += BitmapLen(capacity); + } + + int32_t colLength = *(int32_t*)pStart; + pStart += sizeof(int32_t); + + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + if (pCol->varmeta.allocLen < colLength) { + char* tmp = taosMemoryRealloc(pCol->pData, colLength); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pCol->pData = tmp; + pCol->varmeta.allocLen = colLength; + } + + pCol->varmeta.length = colLength; + ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen); + } + + memcpy(pCol->pData, pStart, colLength); + pStart += colLength; + } + + return TSDB_CODE_SUCCESS; +} + size_t blockDataGetRowSize(SSDataBlock* pBlock) { ASSERT(pBlock != NULL); if (pBlock->info.rowSize == 0) { @@ -627,6 +667,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { return rowSize; } +int32_t getAllowedRowsForPage(const SSDataBlock* pBlock, size_t pgSize) { + return (int32_t) ((pgSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); +} + typedef struct SSDataBlockSortHelper { SArray* orderInfo; // SArray SSDataBlock* pDataBlock; @@ -1178,6 +1222,9 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t code = 0; + if (numOfRows == 0) { + return TSDB_CODE_SUCCESS; + } for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); @@ -1220,7 +1267,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); + return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); } void colDataDestroy(SColumnInfoData* pColData) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 57edc4000723e90164d67a6a7ae8500fff7472ee..908817f2b8c51049624e78513f94d2e75ad405a0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -563,6 +563,27 @@ typedef struct SGroupbyOperatorInfo { SAggSupporter aggSup; } SGroupbyOperatorInfo; +typedef struct SDataGroupInfo { + uint64_t groupId; + int64_t numOfRows; + SArray *pPageList; +} SDataGroupInfo; + +// The sort in partition may be needed later. +typedef struct SPartitionOperatorInfo { + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + bool isInit; // denote if current val is initialized or not + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group +} SPartitionOperatorInfo; + typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c73886b5b0e37106d11efe49531415da0b3c7720..046049c50ef15892bfb4e27b2645e5306d03c00f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7048,7 +7048,8 @@ static SArray* createSortInfo(SNodeList* pNodeList); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { - if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { + int32_t type = nodeType(pPhyNode); + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; int32_t numOfCols = 0; @@ -7056,11 +7057,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); @@ -7073,7 +7074,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo); taosArrayDestroy(tableIdList); return pOperator; - } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); @@ -7089,13 +7090,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } - if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + int32_t type = nodeType(pPhyNode); + size_t size = LIST_LENGTH(pPhyNode->pChildren); + ASSERT(size == 1); - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { int32_t num = 0; SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); @@ -7104,78 +7106,56 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { + int32_t num = 0; - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - int32_t num = 0; - - SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - - if (pAggNode->pGroupKeys != NULL) { - SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); - } else { - return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); - } - } - } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); - - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - - SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - - SInterval interval = { - .interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = pIntervalPhyNode->precision - }; - - int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId; - return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + if (pAggNode->pGroupKeys != NULL) { + SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); + return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); + } else { + return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } - } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SInterval interval = { + .interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = pIntervalPhyNode->precision + }; + + int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId; + return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SArray* info = createSortInfo(pSortPhyNode->pSortKeys); - return createSortOperatorInfo(op, pResBlock, info, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); - - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SArray* info = createSortInfo(pSortPhyNode->pSortKeys); + return createSortOperatorInfo(op, pResBlock, info, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { + SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; + SArray* pColList = extractColumnInfo(pPartNode->pPartitionKeys); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createPartitionOperatorInfo(op, pResBlock, pColList, pTaskInfo, NULL); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -7258,11 +7238,12 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + // todo extract method SColumn c = {0}; c.slotId = pColNode->slotId; - c.colId = pColNode->colId; - c.type = pColNode->node.resType.type; - c.bytes = pColNode->node.resType.bytes; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; c.precision = pColNode->node.resType.precision; c.scale = pColNode->node.resType.scale; @@ -7289,15 +7270,6 @@ SArray* createSortInfo(SNodeList* pNodeList) { SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr; bi.slotId = pColNode->slotId; - // pColNode->order; - // SColumn c = {0}; - // c.slotId = pColNode->slotId; - // c.colId = pColNode->colId; - // c.type = pColNode->node.resType.type; - // c.bytes = pColNode->node.resType.bytes; - // c.precision = pColNode->node.resType.precision; - // c.scale = pColNode->node.resType.scale; - taosArrayPush(pList, &bi); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index b3a8e09f166e05a490fb865e298cdf361473faa1..72954012422a90bcdd3c648ae560e9e606c689f7 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -25,7 +25,7 @@ #include "thash.h" #include "ttypes.h" -static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosMemoryFreeClear(pInfo->keyBuf); @@ -33,44 +33,43 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); } -static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) { - pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pGroupColVals == NULL) { +static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { + *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); + if ((*pGroupColVals) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = taosArrayGet(pGroupColList, i); - pInfo->groupKeyLen += pCol->bytes; + (*keyLen) += pCol->bytes; struct SGroupKeys key = {0}; - key.bytes = pCol->bytes; - key.type = pCol->type; + key.bytes = pCol->bytes; + key.type = pCol->type; key.isNull = false; - key.pData = taosMemoryCalloc(1, pCol->bytes); + key.pData = taosMemoryCalloc(1, pCol->bytes); if (key.pData == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pInfo->pGroupColVals, &key); + taosArrayPush((*pGroupColVals), &key); } int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; - pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize); - if (pInfo->keyBuf == NULL) { + (*keyBuf) = taosMemoryCalloc(1, (*keyLen) + nullFlagSize); + if ((*keyBuf) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; } -static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, - int32_t numOfGroupCols) { +static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? @@ -78,7 +77,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); if (pkey->isNull && isNull) { continue; } @@ -106,18 +105,18 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in return true; } -static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { +static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { pkey->isNull = true; } else { @@ -197,13 +196,13 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { for (int32_t j = 0; j < pBlock->info.rows; ++j) { // Compare with the previous row of this column, and do not set the output buffer again if they are identical. if (!pInfo->isInit) { - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); pInfo->isInit = true; num++; continue; } - bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols); + bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); if (equal) { num++; continue; @@ -212,7 +211,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // The first row of a new block does not belongs to the previous existed group if (!equal && j == 0) { num++; - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); continue; } @@ -227,7 +226,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); num = 1; } @@ -259,7 +258,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pRes; + return (pRes->info.rows == 0)? NULL:pRes; } int32_t order = TSDB_ORDER_ASC; @@ -309,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou } } - return pInfo->binfo.pRes; + return (pRes->info.rows == 0)? NULL:pRes; } SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, @@ -325,7 +324,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - int32_t code = initGroupOptrInfo(pInfo, pGroupColList); + int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -339,7 +338,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->info = pInfo; pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupbyOperatorInfo; + pOperator->closeFn = destroyGroupOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -351,67 +350,202 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } -static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) { +static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SPartitionOperatorInfo* pInfo = pOperator->info; + + int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + // Compare with the previous row of this column, and do not set the output buffer again if they are identical. + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); + + int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); + int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096); + + SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); + void* pPage = NULL; + + if (p == NULL) { // it is a new group + SDataGroupInfo gi = {0}; + gi.pPageList = taosArrayInit(100, sizeof(int32_t)); + taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo)); + + p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); + + int32_t pageId = 0; + pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + taosArrayPush(p->pPageList, &pageId); + + *(int32_t *) pPage = 0; + } else { + int32_t* curId = taosArrayGetLast(p->pPageList); + pPage = getBufPage(pInfo->pBuf, *curId); + + int32_t *rows = (int32_t*) pPage; + if (*rows >= numOfRows) { + // add a new page for current group + int32_t pageId = 0; + pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + taosArrayPush(p->pPageList, &pageId); + + *(int32_t*) pPage = 0; + } + } + + int32_t* rows = (int32_t*) pPage; + (*rows) += 1; + + int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); + offset[0] = sizeof(int32_t); + + int32_t numOfCols = pInfo->binfo.pRes->info.numOfCols; + for(int32_t i = 0; i < numOfCols - 1; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + offset[i + 1] = pColInfoData->info.bytes * numOfRows + numOfRows * sizeof(int32_t) + sizeof(int32_t) + offset[i]; + } else { + offset[i + 1] = pColInfoData->info.bytes * numOfRows + BitmapLen(numOfRows) + sizeof(int32_t) + offset[i]; + } + } + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + // todo + } else { + char* bitmap = pPage + offset[i]; + int32_t* lenx = pPage + offset[i] + BitmapLen(numOfRows); + char* data = (char*) lenx + sizeof(int32_t); + + (*lenx )+= pColInfoData->info.bytes; + bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); + if (isNull) { + colDataSetNull_f(bitmap, (*rows) - 1); + } else { + memcpy(data + ((*rows) - 1)* pColInfoData->info.bytes, colDataGetData(pColInfoData, j), pColInfoData->info.bytes); + } + } + } + + setBufPageDirty(pPage, true); + releaseBufPage(pInfo->pBuf, pPage); + } + + // todo set the consistent group id according to the group keys +} + +static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { + SPartitionOperatorInfo* pInfo = pOperator->info; + + SDataGroupInfo* pGroupInfo = pInfo->pGroupIter; + if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + // try next group data + pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); + if (pInfo->pGroupIter == NULL) { + return NULL; + } + + pGroupInfo = pInfo->pGroupIter; + pInfo->pageIndex = 0; + } + + int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); + void* page = getBufPage(pInfo->pBuf, *pageId); + + int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096); + blockDataFromBuf1(pInfo->binfo.pRes, page, numOfRows); + + pInfo->pageIndex += 1; + return pInfo->binfo.pRes; +} + +static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortOperatorInfo* pInfo = pOperator->info; - bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; + SGroupbyOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + blockDataCleanup(pRes); + return buildPartitionResult(pOperator); } - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, - pInfo->pDataBlock, pTaskInfo->id.str); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + SOperatorInfo* downstream = pOperator->pDownstream[0]; - SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); - ps->param = pOperator->pDownstream[0]; - tsortAddSource(pInfo->pSortHandle, ps); + while (1) { + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { + break; + } - int32_t code = tsortOpen(pInfo->pSortHandle); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + doHashPartition(pOperator, pBlock); } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + blockDataEnsureCapacity(pRes, 4096); + return buildPartitionResult(pOperator); +} + +static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { + SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + taosMemoryFreeClear(pInfo->keyBuf); + taosArrayDestroy(pInfo->pGroupCols); + taosArrayDestroy(pInfo->pGroupColVals); } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pGroupColList, + SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResultBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->pGroupCols = pGroupColList; + + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + if (pInfo->pGroupSet == NULL) { + goto _error; + } + + int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pOperator->name = "PartitionOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doPartitionData; -// pOperator->closeFn = destroyOrderOperatorInfo; + pInfo->binfo.pRes = pResultBlock; + pOperator->numOfOutput = pResultBlock->info.numOfCols; + pOperator->info = pInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = hashPartition; + pOperator->closeFn = destroyPartitionOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); return NULL; } \ No newline at end of file diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7a57d6296912482d913edb61170d9d1d04780e1b..5dfda92982d9643ad3c397aefd4018722e6d4d84 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -425,9 +425,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64, pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); - size_t pgSize = pHandle->pageSize; - int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock); - + int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);