提交 b984e12e 编写于 作者: H Haojun Liao

[td-14493] support partition by

上级 945de54b
...@@ -182,6 +182,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd ...@@ -182,6 +182,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
int32_t pageSize); int32_t pageSize);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
......
...@@ -415,10 +415,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd ...@@ -415,10 +415,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
bool isNull = colDataIsNull(pColInfoData, numOfRows, j, NULL); if (pColInfoData->varmeta.offset[j] != -1) {
if (isNull) {
// do nothing
} else {
char* p = colDataGetData(pColInfoData, j); char* p = colDataGetData(pColInfoData, j);
size += varDataTLen(p); size += varDataTLen(p);
} }
...@@ -547,8 +544,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -547,8 +544,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
memcpy(pCol->varmeta.offset, pStart, metaSize); memcpy(pCol->varmeta.offset, pStart, metaSize);
pStart += metaSize; pStart += metaSize;
} else { } else {
...@@ -581,6 +578,49 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -581,6 +578,49 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
pBlock->info.rows = *(int32_t*)buf;
int32_t numOfCols = pBlock->info.numOfCols;
const char* pStart = buf + sizeof(uint32_t);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
size_t metaSize = capacity * sizeof(int32_t);
memcpy(pCol->varmeta.offset, pStart, metaSize);
pStart += metaSize;
} else {
memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
pStart += BitmapLen(capacity);
}
int32_t colLength = *(int32_t*)pStart;
pStart += sizeof(int32_t);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
if (pCol->varmeta.allocLen < colLength) {
char* tmp = taosMemoryRealloc(pCol->pData, colLength);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCol->pData = tmp;
pCol->varmeta.allocLen = colLength;
}
pCol->varmeta.length = colLength;
ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen);
}
memcpy(pCol->pData, pStart, colLength);
pStart += colLength;
}
return TSDB_CODE_SUCCESS;
}
size_t blockDataGetRowSize(SSDataBlock* pBlock) { size_t blockDataGetRowSize(SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
if (pBlock->info.rowSize == 0) { if (pBlock->info.rowSize == 0) {
...@@ -627,6 +667,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ...@@ -627,6 +667,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
return rowSize; return rowSize;
} }
int32_t getAllowedRowsForPage(const SSDataBlock* pBlock, size_t pgSize) {
return (int32_t) ((pgSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock));
}
typedef struct SSDataBlockSortHelper { typedef struct SSDataBlockSortHelper {
SArray* orderInfo; // SArray<SBlockOrderInfo> SArray* orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
...@@ -1178,6 +1222,9 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) ...@@ -1178,6 +1222,9 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0; int32_t code = 0;
if (numOfRows == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
...@@ -1220,7 +1267,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { ...@@ -1220,7 +1267,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
} }
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock));
} }
void colDataDestroy(SColumnInfoData* pColData) { void colDataDestroy(SColumnInfoData* pColData) {
......
...@@ -563,6 +563,27 @@ typedef struct SGroupbyOperatorInfo { ...@@ -563,6 +563,27 @@ typedef struct SGroupbyOperatorInfo {
SAggSupporter aggSup; SAggSupporter aggSup;
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray *pPageList;
} SDataGroupInfo;
// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo;
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
......
...@@ -7048,7 +7048,8 @@ static SArray* createSortInfo(SNodeList* pNodeList); ...@@ -7048,7 +7048,8 @@ static SArray* createSortInfo(SNodeList* pNodeList);
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { int32_t type = nodeType(pPhyNode);
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
int32_t numOfCols = 0; int32_t numOfCols = 0;
...@@ -7056,11 +7057,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7056,11 +7057,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
...@@ -7073,7 +7074,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7073,7 +7074,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo);
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
...@@ -7089,13 +7090,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7089,13 +7090,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} }
} }
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) { int32_t type = nodeType(pPhyNode);
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1); ASSERT(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
int32_t num = 0; int32_t num = 0;
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
...@@ -7104,78 +7106,56 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7104,78 +7106,56 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo); return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
size_t size = LIST_LENGTH(pPhyNode->pChildren); int32_t num = 0;
assert(size == 1);
for (int32_t i = 0; i < size; ++i) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
int32_t num = 0; if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL);
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); } else {
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL);
} else {
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
}
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {
.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = pIntervalPhyNode->precision
};
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) {
size_t size = LIST_LENGTH(pPhyNode->pChildren); SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
assert(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); int32_t num = 0;
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {
.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = pIntervalPhyNode->precision
};
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
return createSortOperatorInfo(op, pResBlock, info, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
return createSortOperatorInfo(op, pResBlock, info, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
SArray* pColList = extractColumnInfo(pPartNode->pPartitionKeys);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createPartitionOperatorInfo(op, pResBlock, pColList, pTaskInfo, NULL);
} else { } else {
ASSERT(0); ASSERT(0);
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
...@@ -7258,11 +7238,12 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -7258,11 +7238,12 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
// todo extract method
SColumn c = {0}; SColumn c = {0};
c.slotId = pColNode->slotId; c.slotId = pColNode->slotId;
c.colId = pColNode->colId; c.colId = pColNode->colId;
c.type = pColNode->node.resType.type; c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes; c.bytes = pColNode->node.resType.bytes;
c.precision = pColNode->node.resType.precision; c.precision = pColNode->node.resType.precision;
c.scale = pColNode->node.resType.scale; c.scale = pColNode->node.resType.scale;
...@@ -7289,15 +7270,6 @@ SArray* createSortInfo(SNodeList* pNodeList) { ...@@ -7289,15 +7270,6 @@ SArray* createSortInfo(SNodeList* pNodeList) {
SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr; SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
bi.slotId = pColNode->slotId; bi.slotId = pColNode->slotId;
// pColNode->order;
// SColumn c = {0};
// c.slotId = pColNode->slotId;
// c.colId = pColNode->colId;
// c.type = pColNode->node.resType.type;
// c.bytes = pColNode->node.resType.bytes;
// c.precision = pColNode->node.resType.precision;
// c.scale = pColNode->node.resType.scale;
taosArrayPush(pList, &bi); taosArrayPush(pList, &bi);
} }
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosMemoryFreeClear(pInfo->keyBuf); taosMemoryFreeClear(pInfo->keyBuf);
...@@ -33,44 +33,43 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -33,44 +33,43 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroy(pInfo->pGroupColVals);
} }
static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) { static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if (pInfo->pGroupColVals == NULL) { if ((*pGroupColVals) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) { for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i); SColumn* pCol = taosArrayGet(pGroupColList, i);
pInfo->groupKeyLen += pCol->bytes; (*keyLen) += pCol->bytes;
struct SGroupKeys key = {0}; struct SGroupKeys key = {0};
key.bytes = pCol->bytes; key.bytes = pCol->bytes;
key.type = pCol->type; key.type = pCol->type;
key.isNull = false; key.isNull = false;
key.pData = taosMemoryCalloc(1, pCol->bytes); key.pData = taosMemoryCalloc(1, pCol->bytes);
if (key.pData == NULL) { if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pInfo->pGroupColVals, &key); taosArrayPush((*pGroupColVals), &key);
} }
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize);
if (pInfo->keyBuf == NULL) { (*keyBuf) = taosMemoryCalloc(1, (*keyLen) + nullFlagSize);
if ((*keyBuf) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL; SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) { for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); SColumn* pCol = taosArrayGet(pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) { if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
...@@ -78,7 +77,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in ...@@ -78,7 +77,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
if (pkey->isNull && isNull) { if (pkey->isNull && isNull) {
continue; continue;
} }
...@@ -106,18 +105,18 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in ...@@ -106,18 +105,18 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
return true; return true;
} }
static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL; SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) { for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); SColumn* pCol = taosArrayGet(pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) { if (pBlock->pBlockAgg != NULL) {
pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
} }
SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true; pkey->isNull = true;
} else { } else {
...@@ -197,13 +196,13 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -197,13 +196,13 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical. // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) { if (!pInfo->isInit) {
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
pInfo->isInit = true; pInfo->isInit = true;
num++; num++;
continue; continue;
} }
bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols); bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
if (equal) { if (equal) {
num++; num++;
continue; continue;
...@@ -212,7 +211,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -212,7 +211,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// The first row of a new block does not belongs to the previous existed group // The first row of a new block does not belongs to the previous existed group
if (!equal && j == 0) { if (!equal && j == 0) {
num++; num++;
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
continue; continue;
} }
...@@ -227,7 +226,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -227,7 +226,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
num = 1; num = 1;
} }
...@@ -259,7 +258,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -259,7 +258,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
return pRes; return (pRes->info.rows == 0)? NULL:pRes;
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
...@@ -309,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -309,7 +308,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
} }
} }
return pInfo->binfo.pRes; return (pRes->info.rows == 0)? NULL:pRes;
} }
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo,
...@@ -325,7 +324,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -325,7 +324,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
int32_t code = initGroupOptrInfo(pInfo, pGroupColList); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -339,7 +338,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -339,7 +338,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate; pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo; pOperator->closeFn = destroyGroupOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
...@@ -351,67 +350,202 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -351,67 +350,202 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return NULL; return NULL;
} }
static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) { static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info;
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096);
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
void* pPage = NULL;
if (p == NULL) { // it is a new group
SDataGroupInfo gi = {0};
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t *) pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
pPage = getBufPage(pInfo->pBuf, *curId);
int32_t *rows = (int32_t*) pPage;
if (*rows >= numOfRows) {
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t*) pPage = 0;
}
}
int32_t* rows = (int32_t*) pPage;
(*rows) += 1;
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
offset[0] = sizeof(int32_t);
int32_t numOfCols = pInfo->binfo.pRes->info.numOfCols;
for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
offset[i + 1] = pColInfoData->info.bytes * numOfRows + numOfRows * sizeof(int32_t) + sizeof(int32_t) + offset[i];
} else {
offset[i + 1] = pColInfoData->info.bytes * numOfRows + BitmapLen(numOfRows) + sizeof(int32_t) + offset[i];
}
}
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
// todo
} else {
char* bitmap = pPage + offset[i];
int32_t* lenx = pPage + offset[i] + BitmapLen(numOfRows);
char* data = (char*) lenx + sizeof(int32_t);
(*lenx )+= pColInfoData->info.bytes;
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
if (isNull) {
colDataSetNull_f(bitmap, (*rows) - 1);
} else {
memcpy(data + ((*rows) - 1)* pColInfoData->info.bytes, colDataGetData(pColInfoData, j), pColInfoData->info.bytes);
}
}
}
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage);
}
// todo set the consistent group id according to the group keys
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SDataGroupInfo* pGroupInfo = pInfo->pGroupIter;
if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
// try next group data
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
if (pInfo->pGroupIter == NULL) {
return NULL;
}
pGroupInfo = pInfo->pGroupIter;
pInfo->pageIndex = 0;
}
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
void* page = getBufPage(pInfo->pBuf, *pageId);
int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096);
blockDataFromBuf1(pInfo->binfo.pRes, page, numOfRows);
pInfo->pageIndex += 1;
return pInfo->binfo.pRes;
}
static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGroupbyOperatorInfo* pInfo = pOperator->info;
SSortOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->binfo.pRes;
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); blockDataCleanup(pRes);
return buildPartitionResult(pOperator);
} }
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; SOperatorInfo* downstream = pOperator->pDownstream[0];
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
pInfo->pDataBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); while (1) {
ps->param = pOperator->pDownstream[0]; publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
tsortAddSource(pInfo->pSortHandle, ps); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
int32_t code = tsortOpen(pInfo->pSortHandle); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
if (code != TSDB_CODE_SUCCESS) { doHashPartition(pOperator, pBlock);
longjmp(pTaskInfo->env, terrno);
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); blockDataEnsureCapacity(pRes, 4096);
return buildPartitionResult(pOperator);
}
static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
} }
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pGroupColList,
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer pInfo->pGroupCols = pGroupColList;
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDataBlock = pResultBlock; pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
pInfo->pSortInfo = pSortInfo; if (pInfo->pGroupSet == NULL) {
goto _error;
}
int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "PartitionOperator"; pOperator->name = "PartitionOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pOperator->pTaskInfo = pTaskInfo; pInfo->binfo.pRes = pResultBlock;
pOperator->getNextFn = doPartitionData; pOperator->numOfOutput = pResultBlock->info.numOfCols;
// pOperator->closeFn = destroyOrderOperatorInfo; pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashPartition;
pOperator->closeFn = destroyPartitionOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFree(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
} }
\ No newline at end of file
...@@ -425,9 +425,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -425,9 +425,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64, qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64,
pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed);
size_t pgSize = pHandle->pageSize; int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册