未验证 提交 861fb936 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #14095 from taosdata/szhou/feature/sort-group-2

feat: add sort group operator
......@@ -50,6 +50,7 @@ typedef struct SMsortComparParam {
void **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool cmpGroupId;
} SMsortComparParam;
typedef struct SSortHandle SSortHandle;
......@@ -99,6 +100,11 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc
*/
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
/**
*
*/
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId);
/**
*
* @param pHandle
......
......@@ -22,10 +22,11 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) {
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
goto _error;
}
......@@ -44,16 +45,17 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);;
pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator";
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
;
pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
......@@ -146,8 +148,8 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->exprSupp.pExprInfo != NULL) {
int32_t code =
projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
pOperator->exprSupp.numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
}
......@@ -165,8 +167,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
NULL, pTaskInfo->id.str);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
......@@ -232,6 +233,265 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
return TSDB_CODE_SUCCESS;
}
//=====================================================================================
// Group Sort Operator
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
typedef struct SGroupSortOperatorInfo {
SOptrBasicInfo binfo;
SArray* pSortInfo;
SArray* pColMatchInfo;
int64_t startTs;
uint64_t sortElapsed;
bool hasGroupId;
uint64_t currGroupId;
SSDataBlock* prefetchedSortInput;
SSortHandle* pCurrSortHandle;
EChildOperatorStatus childOpStatus;
SSortExecInfo sortExecInfo;
} SGroupSortOperatorInfo;
SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SGroupSortOperatorInfo* pInfo) {
blockDataCleanup(pDataBlock);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
if (p == NULL) {
return NULL;
}
blockDataEnsureCapacity(p, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(p, pTupleHandle);
if (p->info.rows >= capacity) {
break;
}
}
if (p->info.rows > 0) {
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
colDataAssign(pDst, pSrc, p->info.rows);
}
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.capacity = p->info.rows;
}
blockDataDestroy(p);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
typedef struct SGroupSortSourceParam {
SOperatorInfo* childOpInfo;
SGroupSortOperatorInfo* grpSortOpInfo;
} SGroupSortSourceParam;
SSDataBlock* fetchNextGroupSortDataBlock(void* param) {
SGroupSortSourceParam* source = param;
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
if (grpSortOpInfo->prefetchedSortInput) {
SSDataBlock* block = grpSortOpInfo->prefetchedSortInput;
grpSortOpInfo->prefetchedSortInput = NULL;
return block;
} else {
SOperatorInfo* childOp = source->childOpInfo;
SSDataBlock* block = childOp->fpSet.getNextFn(childOp);
if (block != NULL) {
if (block->info.groupId == grpSortOpInfo->currGroupId) {
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
return block;
} else {
grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
grpSortOpInfo->prefetchedSortInput = block;
return NULL;
}
} else {
grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
return NULL;
}
}
}
int32_t beginSortGroup(SOperatorInfo* pOperator) {
SGroupSortOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pCurrSortHandle =
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
param->childOpInfo = pOperator->pDownstream[0];
param->grpSortOpInfo = pInfo;
ps->param = param;
tsortAddSource(pInfo->pCurrSortHandle, ps);
int32_t code = tsortOpen(pInfo->pCurrSortHandle);
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
return TSDB_CODE_SUCCESS;
}
int32_t finishSortGroup(SOperatorInfo* pOperator) {
SGroupSortOperatorInfo* pInfo = pOperator->info;
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
pInfo->sortExecInfo.loops += sortExecInfo.loops;
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
if (pInfo->pCurrSortHandle != NULL) {
tsortDestroySortHandle(pInfo->pCurrSortHandle);
}
pInfo->pCurrSortHandle = NULL;
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupSortOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId;
pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
beginSortGroup(pOperator);
}
SSDataBlock* pBlock = NULL;
while (pInfo->pCurrSortHandle != NULL) {
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP);
pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
if (pBlock != NULL) {
pBlock->info.groupId = pInfo->currGroupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
} else {
if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
finishSortGroup(pOperator);
pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId;
beginSortGroup(pOperator);
} else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
finishSortGroup(pOperator);
doSetOperatorCompleted(pOperator);
return NULL;
}
}
}
return NULL;
}
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
*pOptrExplain = &pInfo->sortExecInfo;
*len = sizeof(SSortExecInfo);
return TSDB_CODE_SUCCESS;
}
// TODO:
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
goto _error;
}
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo =
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
;
pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "GroupSortOperator";
// TODO
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL,
NULL, getGroupSortExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
}
// TODO: sort group
// TODO: msortCompare compare group id in multiway merge sort.
// TODO: table merge scan, group first, then for each group, multiple readers
//=====================================================================================
// Multiway Sort Merge operator
typedef struct SMultiwaySortMergeOperatorInfo {
SOptrBasicInfo binfo;
......@@ -259,11 +519,12 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE,
pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, true);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i];
......@@ -286,7 +547,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pDataBlock);
......@@ -387,24 +648,23 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
goto _error;
}
initResultSizeInfo(pOperator, 1024);
pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock;
pOperator->name = "MultiwaySortMerge";
pInfo->pInputBlock = pInputBlock;
pOperator->name = "MultiwaySortMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->bufPageSize = getProperSortPageSize(rowSize);
// one additional is reserved for merged result.
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
......
......@@ -86,6 +86,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->cmpParam.cmpGroupId = false;
tsortSetComparFp(pSortHandle, msortComparFn);
......@@ -374,6 +375,12 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
SSDataBlock* pRightBlock = pRightSource->src.pBlock;
if (pParam->cmpGroupId) {
if (pLeftBlock->info.groupId != pRightBlock->info.groupId) {
return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1;
}
}
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
......@@ -680,6 +687,11 @@ int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
return TSDB_CODE_SUCCESS;
}
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
pHandle->cmpParam.cmpGroupId = compareGroupId;
return TSDB_CODE_SUCCESS;
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册