提交 83b44e2f 编写于 作者: H Haojun Liao

fix(query):support multigroup in groupby operator.

上级 46c6302a
......@@ -115,9 +115,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
p->groupId = *(uint64_t*)key;
p->pos = *(SResultRowPosition*)pData;
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
#ifdef BUF_PAGE_DEBUG
qDebug("page_groupRes, groupId:%" PRIu64 ",pageId:%d,offset:%d\n", p->groupId, p->pos.pageId, p->pos.offset);
#endif
taosArrayPush(pGroupResInfo->pRows, &p);
}
......
......@@ -1506,15 +1506,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
int32_t numOfExprs) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t start = pGroupResInfo->index;
#ifdef BUF_PAGE_DEBUG
qDebug("\npage_copytoblock rows:%d", numOfRows);
#endif
for (int32_t i = start; i < numOfRows; i += 1) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
#ifdef BUF_PAGE_DEBUG
qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
#endif
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
......
......@@ -29,7 +29,7 @@
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
......@@ -264,7 +264,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, 0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
......@@ -282,7 +282,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret =
setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
......@@ -293,6 +293,29 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes;
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -304,22 +327,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes;
return buildGroupResultDataBlock(pOperator);
}
int32_t order = TSDB_ORDER_ASC;
......@@ -373,26 +381,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pRes;
return buildGroupResultDataBlock(pOperator);
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......@@ -800,7 +789,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
}
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册