diff --git a/include/common/tcommon.h b/include/common/tcommon.h index f2fec58d8fca43336c6ca41e69bd708aa2c76670..bf6a804e7f0f3dae1569c80e064446c2d17d7fae 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -71,7 +71,7 @@ typedef struct SDataBlockInfo { int64_t uid; int64_t blockId; }; - int64_t groupId; // no need to serialize + uint64_t groupId; // no need to serialize } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 797843002751928990f1eb7046ee99a4a3baa5b2..98edd9f0a5c73ec64bba789e1bb6e633f42e39af 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -182,6 +182,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t pageSize); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); +int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity); SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fb6640e59eaf129b701249cb87220695c60f6fd9..8f55488e2c75de796c690626e16a25ad3c42b179 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -415,10 +415,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - bool isNull = colDataIsNull(pColInfoData, numOfRows, j, NULL); - if (isNull) { - // do nothing - } else { + if (pColInfoData->varmeta.offset[j] != -1) { char* p = colDataGetData(pColInfoData, j); size += varDataTLen(p); } @@ -547,8 +544,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); - size_t metaSize = pBlock->info.rows * sizeof(int32_t); if (IS_VAR_DATA_TYPE(pCol->info.type)) { + size_t metaSize = pBlock->info.rows * sizeof(int32_t); memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; } else { @@ -581,6 +578,49 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { return TSDB_CODE_SUCCESS; } +int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { + pBlock->info.rows = *(int32_t*)buf; + + int32_t numOfCols = pBlock->info.numOfCols; + const char* pStart = buf + sizeof(uint32_t); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + size_t metaSize = capacity * sizeof(int32_t); + memcpy(pCol->varmeta.offset, pStart, metaSize); + pStart += metaSize; + } else { + memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity)); + pStart += BitmapLen(capacity); + } + + int32_t colLength = *(int32_t*)pStart; + pStart += sizeof(int32_t); + + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + if (pCol->varmeta.allocLen < colLength) { + char* tmp = taosMemoryRealloc(pCol->pData, colLength); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pCol->pData = tmp; + pCol->varmeta.allocLen = colLength; + } + + pCol->varmeta.length = colLength; + ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen); + } + + memcpy(pCol->pData, pStart, colLength); + pStart += pCol->info.bytes * capacity; + } + + return TSDB_CODE_SUCCESS; +} + size_t blockDataGetRowSize(SSDataBlock* pBlock) { ASSERT(pBlock != NULL); if (pBlock->info.rowSize == 0) { @@ -627,6 +667,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { return rowSize; } +int32_t getAllowedRowsForPage(const SSDataBlock* pBlock, size_t pgSize) { + return (int32_t) ((pgSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); +} + typedef struct SSDataBlockSortHelper { SArray* orderInfo; // SArray SSDataBlock* pDataBlock; @@ -1178,6 +1222,9 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t code = 0; + if (numOfRows == 0) { + return TSDB_CODE_SUCCESS; + } for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); @@ -1220,7 +1267,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); + return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); } void colDataDestroy(SColumnInfoData* pColData) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 57edc4000723e90164d67a6a7ae8500fff7472ee..35a4b93a880dcef92d1aac31a3be06ba3b57367e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -511,6 +511,12 @@ typedef struct SProjectOperatorInfo { SSDataBlock *existDataBlock; SArray *pPseudoColInfo; SLimit limit; + SLimit slimit; + + uint64_t groupId; + int64_t curSOffset; + int64_t curGroupOutput; + int64_t curOffset; int64_t curOutput; } SProjectOperatorInfo; @@ -563,6 +569,29 @@ typedef struct SGroupbyOperatorInfo { SAggSupporter aggSup; } SGroupbyOperatorInfo; +typedef struct SDataGroupInfo { + uint64_t groupId; + int64_t numOfRows; + SArray *pPageList; +} SDataGroupInfo; + +// The sort in partition may be needed later. +typedef struct SPartitionOperatorInfo { + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data + + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group +} SPartitionOperatorInfo; + typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -650,7 +679,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, @@ -667,8 +696,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); - +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, + SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9f9aa730874a406237932ca50ff56033c0885621..e85cfb76fd981311c2b723a3a25cea4c255f7f18 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -302,6 +302,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { } taosArrayPush(pBlock->pDataBlock, &idata); + + if (IS_VAR_DATA_TYPE(idata.info.type)) { + pBlock->info.hasVarCol = true; + } } return pBlock; @@ -1259,6 +1263,7 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList) { setPseudoOutputColInfo(pResult, pCtx, pPseudoList); + pResult->info.groupId = pSrcBlock->info.groupId; for (int32_t k = 0; k < numOfOutput; ++k) { if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query @@ -5422,7 +5427,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { - assert(*newgroup == false); *newgroup = prevVal; setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); break; @@ -5450,6 +5454,38 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo); + if (pProjectInfo->curSOffset > 0) { + if (pProjectInfo->groupId == 0) { // it is the first group + pProjectInfo->groupId = pBlock->info.groupId; + blockDataCleanup(pInfo->pRes); + continue; + } else if (pProjectInfo->groupId != pBlock->info.groupId) { + pProjectInfo->curSOffset -= 1; + + // ignore data block in current group + if (pProjectInfo->curSOffset > 0) { + blockDataCleanup(pInfo->pRes); + continue; + } + } + + pProjectInfo->groupId = pBlock->info.groupId; + } + + if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) { + pProjectInfo->curGroupOutput += 1; + if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + // reset the value for a new group data + pProjectInfo->curOffset = 0; + pProjectInfo->curOutput = 0; + } + + pProjectInfo->groupId = pBlock->info.groupId; + // todo extract method if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) { blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset); @@ -6317,7 +6353,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) } SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, - SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -6325,7 +6361,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p } pInfo->limit = *pLimit; + pInfo->slimit = *pSlimit; pInfo->curOffset = pLimit->offset; + pInfo->curSOffset = pSlimit->offset; + pInfo->binfo.pRes = pResBlock; int32_t numOfCols = num; @@ -7052,11 +7091,13 @@ static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols); static SArray* createSortInfo(SNodeList* pNodeList); +static SArray* extractPartitionColInfo(SNodeList* pNodeList); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { - if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { + int32_t type = nodeType(pPhyNode); + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; int32_t numOfCols = 0; @@ -7064,11 +7105,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); @@ -7081,7 +7122,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo); taosArrayDestroy(tableIdList); return pOperator; - } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); @@ -7097,93 +7138,76 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } - if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + int32_t type = nodeType(pPhyNode); + size_t size = LIST_LENGTH(pPhyNode->pChildren); + ASSERT(size == 1); - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { int32_t num = 0; SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; - - return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); - - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - - int32_t num = 0; - - SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - - if (pAggNode->pGroupKeys != NULL) { - SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); - } else { - return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); - } - } - } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); - - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - - SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - - SInterval interval = { - .interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = pIntervalPhyNode->precision - }; - - int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId; - return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; + return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { + int32_t num = 0; + + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + + if (pAggNode->pGroupKeys != NULL) { + SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); + return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); + } else { + return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } - } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SInterval interval = { + .interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = pIntervalPhyNode->precision + }; + + int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId; + return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SArray* info = createSortInfo(pSortPhyNode->pSortKeys); - return createSortOperatorInfo(op, pResBlock, info, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); - - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SArray* info = createSortInfo(pSortPhyNode->pSortKeys); + return createSortOperatorInfo(op, pResBlock, info, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { + SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; + SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); + + return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -7266,11 +7290,38 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + // todo extract method SColumn c = {0}; c.slotId = pColNode->slotId; - c.colId = pColNode->colId; - c.type = pColNode->node.resType.type; - c.bytes = pColNode->node.resType.bytes; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.precision = pColNode->node.resType.precision; + c.scale = pColNode->node.resType.scale; + + taosArrayPush(pList, &c); + } + + return pList; +} + +SArray* extractPartitionColInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i); + + // todo extract method + SColumn c = {0}; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; c.precision = pColNode->node.resType.precision; c.scale = pColNode->node.resType.scale; @@ -7296,15 +7347,6 @@ SArray* createSortInfo(SNodeList* pNodeList) { SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr; bi.slotId = pColNode->slotId; - // pColNode->order; - // SColumn c = {0}; - // c.slotId = pColNode->slotId; - // c.colId = pColNode->colId; - // c.type = pColNode->node.resType.type; - // c.bytes = pColNode->node.resType.bytes; - // c.precision = pColNode->node.resType.precision; - // c.scale = pColNode->node.resType.scale; - taosArrayPush(pList, &bi); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index b3a8e09f166e05a490fb865e298cdf361473faa1..86c2ad4f21819367ff5ae2d25603db722a0e9d35 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -25,7 +25,11 @@ #include "thash.h" #include "ttypes.h" -static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { +static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); +static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); +static uint64_t calcGroupId(char* pData, int32_t len); + +static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosMemoryFreeClear(pInfo->keyBuf); @@ -33,44 +37,43 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); } -static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) { - pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pGroupColVals == NULL) { +static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { + *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); + if ((*pGroupColVals) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = taosArrayGet(pGroupColList, i); - pInfo->groupKeyLen += pCol->bytes; + (*keyLen) += pCol->bytes; struct SGroupKeys key = {0}; - key.bytes = pCol->bytes; - key.type = pCol->type; + key.bytes = pCol->bytes; + key.type = pCol->type; key.isNull = false; - key.pData = taosMemoryCalloc(1, pCol->bytes); + key.pData = taosMemoryCalloc(1, pCol->bytes); if (key.pData == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pInfo->pGroupColVals, &key); + taosArrayPush((*pGroupColVals), &key); } int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; - pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize); - if (pInfo->keyBuf == NULL) { + (*keyBuf) = taosMemoryCalloc(1, (*keyLen) + nullFlagSize); + if ((*keyBuf) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; } -static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, - int32_t numOfGroupCols) { +static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? @@ -78,7 +81,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); if (pkey->isNull && isNull) { continue; } @@ -106,18 +109,18 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in return true; } -static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { +static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { pkey->isNull = true; } else { @@ -197,13 +200,13 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { for (int32_t j = 0; j < pBlock->info.rows; ++j) { // Compare with the previous row of this column, and do not set the output buffer again if they are identical. if (!pInfo->isInit) { - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); pInfo->isInit = true; num++; continue; } - bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols); + bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); if (equal) { num++; continue; @@ -212,7 +215,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // The first row of a new block does not belongs to the previous existed group if (!equal && j == 0) { num++; - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); continue; } @@ -227,7 +230,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); num = 1; } @@ -259,7 +262,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pRes; + return (pRes->info.rows == 0)? NULL:pRes; } int32_t order = TSDB_ORDER_ASC; @@ -309,7 +312,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou } } - return pInfo->binfo.pRes; + return (pRes->info.rows == 0)? NULL:pRes; } SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, @@ -325,7 +328,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - int32_t code = initGroupOptrInfo(pInfo, pGroupColList); + int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -339,7 +342,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->info = pInfo; pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupbyOperatorInfo; + pOperator->closeFn = destroyGroupOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -351,67 +354,263 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } -static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) { +static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { +// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SPartitionOperatorInfo* pInfo = pOperator->info; + + int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); + int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); + + SDataGroupInfo* pGInfo = NULL; + void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len); + + pGInfo->numOfRows += 1; + if (pGInfo->groupId == 0) { + pGInfo->groupId = calcGroupId(pInfo->keyBuf, len); + } + + int32_t* rows = (int32_t*) pPage; + + size_t numOfCols = pOperator->numOfOutput; + for(int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = &pOperator->pExpr[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + + int32_t bytes = pColInfoData->info.bytes; + int32_t startOffset = pInfo->columnOffset[i]; + + char* columnLen = NULL; + int32_t contentLen = 0; + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + int32_t* offset = pPage + startOffset; + columnLen = pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity; + char* data = (char*)(columnLen + sizeof(int32_t)); + + if (colDataIsNull_s(pColInfoData, j)) { + offset[(*rows)] = -1; + contentLen = 0; + } else { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + memcpy(data + (*columnLen), src, varDataTLen(src)); + contentLen = varDataTLen(src); + } + } else { + char* bitmap = pPage + startOffset; + columnLen = pPage + startOffset + BitmapLen(pInfo->rowCapacity); + char* data = (char*) columnLen + sizeof(int32_t); + + bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); + if (isNull) { + colDataSetNull_f(bitmap, (*rows)); + } else { + memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); + } + contentLen = bytes; + } + + (*columnLen) += contentLen; + } + + (*rows) += 1; + + setBufPageDirty(pPage, true); + releaseBufPage(pInfo->pBuf, pPage); + } +} + +void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) { + SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); + + void* pPage = NULL; + if (p == NULL) { // it is a new group + SDataGroupInfo gi = {0}; + gi.pPageList = taosArrayInit(100, sizeof(int32_t)); + taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo)); + + p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); + + int32_t pageId = 0; + pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + taosArrayPush(p->pPageList, &pageId); + + *(int32_t *) pPage = 0; + } else { + int32_t* curId = taosArrayGetLast(p->pPageList); + pPage = getBufPage(pInfo->pBuf, *curId); + + int32_t *rows = (int32_t*) pPage; + if (*rows >= pInfo->rowCapacity) { + // add a new page for current group + int32_t pageId = 0; + pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + taosArrayPush(p->pPageList, &pageId); + + *(int32_t*) pPage = 0; + } + } + + *pGroupInfo = p; + return pPage; +} + +uint64_t calcGroupId(char* pData, int32_t len) { + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pData, len); + tMD5Final(&context); + + // NOTE: only extract the initial 8 bytes of the final MD5 digest + uint64_t id = 0; + memcpy(&id, context.digest, sizeof(uint64_t)); + return id; +} + +int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { + size_t numOfCols = pBlock->info.numOfCols; + int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); + + offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format + + for(int32_t i = 0; i < numOfCols - 1; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int32_t bytes = pColInfoData->info.bytes; + int32_t payloadLen = bytes * rowCapacity; + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + // offset segment + content length + payload + offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i]; + } else { + // bitmap + content length + payload + offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i]; + } + } + + return offset; +} + +static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { + SPartitionOperatorInfo* pInfo = pOperator->info; + + SDataGroupInfo* pGroupInfo = pInfo->pGroupIter; + if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + // try next group data + pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); + if (pInfo->pGroupIter == NULL) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + pGroupInfo = pInfo->pGroupIter; + pInfo->pageIndex = 0; + } + + int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); + void* page = getBufPage(pInfo->pBuf, *pageId); + + blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); + + pInfo->pageIndex += 1; + + pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; + return pInfo->binfo.pRes; +} + +static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortOperatorInfo* pInfo = pOperator->info; - bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; + SGroupbyOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + blockDataCleanup(pRes); + return buildPartitionResult(pOperator); } - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, - pInfo->pDataBlock, pTaskInfo->id.str); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + SOperatorInfo* downstream = pOperator->pDownstream[0]; - SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); - ps->param = pOperator->pDownstream[0]; - tsortAddSource(pInfo->pSortHandle, ps); + while (1) { + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { + break; + } - int32_t code = tsortOpen(pInfo->pSortHandle); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + doHashPartition(pOperator, pBlock); } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + blockDataEnsureCapacity(pRes, 4096); + return buildPartitionResult(pOperator); } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { + SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + taosArrayDestroy(pInfo->pGroupCols); + taosArrayDestroy(pInfo->pGroupColVals); + taosMemoryFree(pInfo->keyBuf); + taosMemoryFree(pInfo->columnOffset); +} + +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, + SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResultBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->pGroupCols = pGroupColList; + + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + if (pInfo->pGroupSet == NULL) { + goto _error; + } + + int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf)); + pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pOperator->name = "PartitionOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doPartitionData; -// pOperator->closeFn = destroyOrderOperatorInfo; + pInfo->binfo.pRes = pResultBlock; + pOperator->numOfOutput = numOfCols; + pOperator->pExpr = pExprInfo; + pOperator->info = pInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = hashPartition; + pOperator->closeFn = destroyPartitionOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); return NULL; } \ No newline at end of file diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7a57d6296912482d913edb61170d9d1d04780e1b..5dfda92982d9643ad3c397aefd4018722e6d4d84 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -425,9 +425,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64, pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); - size_t pgSize = pHandle->pageSize; - int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock); - + int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index d834263b940ae6932bcd121b5a18791a491012d5..84a2efb46c203e6dab2283093779ce4179fd736e 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -48,10 +48,8 @@ struct SDiskbasedBuf { }; static int32_t createDiskFile(SDiskbasedBuf* pBuf) { - // pBuf->file = fopen(pBuf->path, "wb+"); - pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); if (pBuf->pFile == NULL) { - // qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); }