提交 e7df9b8f 编写于 作者: D dapan1121

enh: optimize copy group result

上级 198be9a7
......@@ -120,6 +120,11 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (NULL == pMgmt->pMnode) {
const STraceId *trace = &pMsg->info.traceId;
dGError("msg:%p, stop to pre-process in mnode since mnode is NULL, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
return -1;
}
pMsg->info.node = pMgmt->pMnode;
if (mndPreProcessQueryMsg(pMsg) != 0) {
const STraceId *trace = &pMsg->info.traceId;
......
......@@ -82,7 +82,7 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, int32_t threshold);
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL;
......@@ -776,7 +776,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
}
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, int32_t threshold) {
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
......@@ -803,20 +803,23 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
continue;
}
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pPos->groupId;
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pPos->groupId) {
releaseBufPage(pBuf, page);
break;
if (!ignoreGroup) {
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pPos->groupId;
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pPos->groupId) {
releaseBufPage(pBuf, page);
break;
}
}
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
uint32_t newSize = pBlock->info.rows + pRow->numOfRows + (numOfRows - i) > 1 ? 1 : 0;
blockDataEnsureCapacity(pBlock, newSize);
qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s",
(pRow->numOfRows + pBlock->info.rows), pBlock->info.capacity, GET_TASKID(pTaskInfo));
newSize, pBlock->info.capacity, GET_TASKID(pTaskInfo));
// todo set the pOperator->resultInfo size
}
......@@ -853,7 +856,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
......@@ -880,10 +883,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// clear the existed group id
pBlock->info.id.groupId = 0;
if (!pbInfo->mergeResultBlock) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
} else {
while (hasRemainResults(pGroupResInfo)) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, true);
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册